Documentation ¶
Index ¶
- Constants
- func LoadAdaptiveRateLimiterConfig(cfg *AdaptiveRateLimiterConfig, cfgmap map[interface{}]interface{}) error
- func NewAdaptiveRateLimitTracker(config AdaptiveRateLimiterConfig, registry metrics.Registry, ...) rate.AdaptiveRateLimitTracker
- func NewAdaptiveRateLimiter(config AdaptiveRateLimiterConfig, registry metrics.Registry, ...) rate.AdaptiveRateLimiter
- func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, ...) rate.RateLimiter
- func WasRateLimited(err error) bool
- type AdaptiveRateLimiterConfig
- type Command
- type CreateEntityCommand
- type Decoder
- type DecoderF
- type Decoders
- type DefaultRateLimiter
- type DeleteEntityCommand
- type Dispatcher
- type EntityCreator
- type EntityDeleter
- type EntityManager
- type EntityMarshaller
- type EntityUpdater
- type LocalDispatcher
- func (self *LocalDispatcher) Bootstrap() error
- func (self *LocalDispatcher) Dispatch(command Command) error
- func (self *LocalDispatcher) GetPeers() map[string]channel.Channel
- func (self *LocalDispatcher) GetRateLimiter() rate.RateLimiter
- func (self *LocalDispatcher) IsLeader() bool
- func (self *LocalDispatcher) IsLeaderOrLeaderless() bool
- func (self *LocalDispatcher) IsLeaderless() bool
- type NoOpAdaptiveRateLimitTracker
- type NoOpAdaptiveRateLimiter
- type NoOpRateLimiter
- type RateLimiterConfig
- type SyncSnapshotCommand
- type UpdateEntityCommand
- type Validatable
Constants ¶
const ( MetricCommandLimiterCurrentQueuedCount = "command.limiter.queued_count" MetricCommandLimiterWorkTimer = "command.limiter.work_timer" DefaultLimiterSize = 100 MinLimiterSize = 10 DefaultAdaptiveRateLimiterEnabled = true DefaultAdaptiveRateLimiterMinWindowSize = 5 DefaultAdaptiveRateLimiterMaxWindowSize = 250 DefaultAdaptiveRateLimiterTimeout = 30 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func LoadAdaptiveRateLimiterConfig ¶ added in v0.33.0
func LoadAdaptiveRateLimiterConfig(cfg *AdaptiveRateLimiterConfig, cfgmap map[interface{}]interface{}) error
func NewAdaptiveRateLimitTracker ¶ added in v0.33.0
func NewAdaptiveRateLimitTracker(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimitTracker
func NewAdaptiveRateLimiter ¶ added in v0.32.0
func NewAdaptiveRateLimiter(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimiter
func NewRateLimiter ¶ added in v0.31.0
func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.RateLimiter
func WasRateLimited ¶ added in v0.32.2
Types ¶
type AdaptiveRateLimiterConfig ¶ added in v0.32.0
type AdaptiveRateLimiterConfig struct { // Enabled - if false, a no-op rate limiter will be created, which doesn't enforce any rate limiting Enabled bool // MaxSize - the maximum window size to allow MaxSize uint32 // MinSize - the smallest window size to allow MinSize uint32 // WorkTimerMetric - the name of the timer metric for timing how long operations take to execute WorkTimerMetric string // QueueSize - the name of the gauge metric showing the current number of operations queued QueueSizeMetric string // WindowSizeMetric - the name of the metric show the current window size WindowSizeMetric string // Timeout - only used for AdaptiveRateLimitTracker, sets when a piece of outstanding work will be assumed to // have failed if it hasn't been marked completed yet, so that work slots aren't lost Timeout time.Duration }
AdaptiveRateLimiterConfig contains configuration values used to create a new AdaptiveRateLimiter
func (*AdaptiveRateLimiterConfig) SetDefaults ¶ added in v0.33.0
func (self *AdaptiveRateLimiterConfig) SetDefaults()
type Command ¶
type Command interface { // Apply runs the commands Apply(ctx boltz.MutateContext) error // GetChangeContext returns the change context associated with the command GetChangeContext() *change.Context // Encode returns a serialized representation of the command Encode() ([]byte, error) }
Command instances represent actions to be taken by the fabric controller. They are serializable, so they can be shipped from one controller for RAFT coordination
type CreateEntityCommand ¶
type CreateEntityCommand[T models.Entity] struct { Context *change.Context Creator EntityCreator[T] Entity T PostCreateHook func(ctx boltz.MutateContext, entity T) error Flags uint32 }
func (*CreateEntityCommand[T]) Apply ¶
func (self *CreateEntityCommand[T]) Apply(ctx boltz.MutateContext) error
func (*CreateEntityCommand[T]) Encode ¶
func (self *CreateEntityCommand[T]) Encode() ([]byte, error)
func (*CreateEntityCommand[T]) GetChangeContext ¶
func (self *CreateEntityCommand[T]) GetChangeContext() *change.Context
type Decoders ¶
type Decoders interface { Register(id int32, decoder Decoder) RegisterF(id int32, decoder DecoderF) Decode(data []byte) (Command, error) GetDecoder(id int32) Decoder Clear() }
func GetDefaultDecoders ¶
func GetDefaultDecoders() Decoders
func NewDecoders ¶
func NewDecoders() Decoders
type DefaultRateLimiter ¶ added in v0.31.0
type DefaultRateLimiter struct {
// contains filtered or unexported fields
}
func (*DefaultRateLimiter) GetQueueFillPct ¶ added in v0.33.0
func (self *DefaultRateLimiter) GetQueueFillPct() float64
func (*DefaultRateLimiter) RunRateLimited ¶ added in v0.31.0
func (self *DefaultRateLimiter) RunRateLimited(f func() error) error
type DeleteEntityCommand ¶
type DeleteEntityCommand struct { Context *change.Context Deleter EntityDeleter Id string }
func (*DeleteEntityCommand) Apply ¶
func (self *DeleteEntityCommand) Apply(ctx boltz.MutateContext) error
func (*DeleteEntityCommand) Encode ¶
func (self *DeleteEntityCommand) Encode() ([]byte, error)
func (*DeleteEntityCommand) GetChangeContext ¶
func (self *DeleteEntityCommand) GetChangeContext() *change.Context
type Dispatcher ¶
type Dispatcher interface { Dispatch(command Command) error IsLeaderOrLeaderless() bool IsLeaderless() bool IsLeader() bool GetPeers() map[string]channel.Channel GetRateLimiter() rate.RateLimiter Bootstrap() error }
Dispatcher instances will take a command and either send it to the leader to be applied, or if the current system is the leader, apply it locally
type EntityCreator ¶
type EntityCreator[T models.Entity] interface { EntityMarshaller[T] // ApplyCreate creates the entity described by the given command ApplyCreate(cmd *CreateEntityCommand[T], ctx boltz.MutateContext) error }
EntityCreator instances can apply a create entity command to create entities of a given type
type EntityDeleter ¶
type EntityDeleter interface { GetEntityTypeId() string // ApplyDelete deletes the entity described by the given command ApplyDelete(cmd *DeleteEntityCommand, ctx boltz.MutateContext) error }
EntityDeleter instances can apply a delete entity command to delete entities of a given type
type EntityManager ¶
type EntityManager[T models.Entity] interface { EntityCreator[T] EntityUpdater[T] EntityDeleter }
EntityManager instances can handle create, update and delete entities of a specific type
type EntityMarshaller ¶
type EntityMarshaller[T any] interface { // GetEntityTypeId returns the entity type id. This is distinct from the Store entity id // which may be shared by types, such as service and router. The entity type is unique // for each type GetEntityTypeId() string // Marshall marshals the entity to a bytes encoded format Marshall(entity T) ([]byte, error) // Unmarshall unmarshalls the bytes back into an entity Unmarshall(bytes []byte) (T, error) }
EntityMarshaller instances can marshal and unmarshal entities of the type that they manage as well as knowing their entity type
type EntityUpdater ¶
type EntityUpdater[T models.Entity] interface { EntityMarshaller[T] // ApplyUpdate updates the entity described by the given command ApplyUpdate(cmd *UpdateEntityCommand[T], ctx boltz.MutateContext) error }
EntityUpdater instances can apply an update entity command to update entities of a given type
type LocalDispatcher ¶
type LocalDispatcher struct { EncodeDecodeCommands bool Limiter rate.RateLimiter }
LocalDispatcher should be used when running a non-clustered system
func (*LocalDispatcher) Bootstrap ¶ added in v1.2.0
func (self *LocalDispatcher) Bootstrap() error
func (*LocalDispatcher) Dispatch ¶
func (self *LocalDispatcher) Dispatch(command Command) error
func (*LocalDispatcher) GetPeers ¶
func (self *LocalDispatcher) GetPeers() map[string]channel.Channel
func (*LocalDispatcher) GetRateLimiter ¶ added in v0.33.0
func (self *LocalDispatcher) GetRateLimiter() rate.RateLimiter
func (*LocalDispatcher) IsLeader ¶ added in v1.2.1
func (self *LocalDispatcher) IsLeader() bool
func (*LocalDispatcher) IsLeaderOrLeaderless ¶
func (self *LocalDispatcher) IsLeaderOrLeaderless() bool
func (*LocalDispatcher) IsLeaderless ¶ added in v1.1.8
func (self *LocalDispatcher) IsLeaderless() bool
type NoOpAdaptiveRateLimitTracker ¶ added in v0.33.0
type NoOpAdaptiveRateLimitTracker struct{}
func (NoOpAdaptiveRateLimitTracker) IsRateLimited ¶ added in v0.33.0
func (n NoOpAdaptiveRateLimitTracker) IsRateLimited() bool
func (NoOpAdaptiveRateLimitTracker) RunRateLimited ¶ added in v0.33.0
func (n NoOpAdaptiveRateLimitTracker) RunRateLimited(string) (rate.RateLimitControl, error)
func (NoOpAdaptiveRateLimitTracker) RunRateLimitedF ¶ added in v0.33.0
func (n NoOpAdaptiveRateLimitTracker) RunRateLimitedF(_ string, f func(control rate.RateLimitControl) error) error
type NoOpAdaptiveRateLimiter ¶ added in v0.32.0
type NoOpAdaptiveRateLimiter struct{}
func (NoOpAdaptiveRateLimiter) RunRateLimited ¶ added in v0.32.0
func (self NoOpAdaptiveRateLimiter) RunRateLimited(f func() error) (rate.RateLimitControl, error)
type NoOpRateLimiter ¶ added in v0.31.0
type NoOpRateLimiter struct{}
func (NoOpRateLimiter) GetQueueFillPct ¶ added in v0.33.0
func (self NoOpRateLimiter) GetQueueFillPct() float64
func (NoOpRateLimiter) RunRateLimited ¶ added in v0.31.0
func (self NoOpRateLimiter) RunRateLimited(f func() error) error
type RateLimiterConfig ¶ added in v0.31.0
type SyncSnapshotCommand ¶
type SyncSnapshotCommand struct { SnapshotId string Snapshot []byte SnapshotSink func(cmd *SyncSnapshotCommand) error }
func (*SyncSnapshotCommand) Apply ¶
func (self *SyncSnapshotCommand) Apply(boltz.MutateContext) error
func (*SyncSnapshotCommand) Encode ¶
func (self *SyncSnapshotCommand) Encode() ([]byte, error)
func (*SyncSnapshotCommand) GetChangeContext ¶
func (self *SyncSnapshotCommand) GetChangeContext() *change.Context
type UpdateEntityCommand ¶
type UpdateEntityCommand[T models.Entity] struct { Context *change.Context Updater EntityUpdater[T] Entity T UpdatedFields fields.UpdatedFields Flags uint32 }
func (*UpdateEntityCommand[T]) Apply ¶
func (self *UpdateEntityCommand[T]) Apply(ctx boltz.MutateContext) error
func (*UpdateEntityCommand[T]) Encode ¶
func (self *UpdateEntityCommand[T]) Encode() ([]byte, error)
func (*UpdateEntityCommand[T]) GetChangeContext ¶
func (self *UpdateEntityCommand[T]) GetChangeContext() *change.Context
type Validatable ¶
type Validatable interface {
Validate() error
}
Validatable instances can be validated. Command instances which implement Validable will be validated before Command.Apply is called