Documentation
¶
Index ¶
- func ContextWithBus(ctx context.Context, bus *Bus) context.Context
- func FilterFmt(kind string, labels ...Label) string
- type Bus
- func (b *Bus) DisableBufferPublication()
- func (b *Bus) EnableBufferPublication(capacity int32)
- func (b *Bus) Name() string
- func (b *Bus) Pub(v Messager, labels ...Label)
- func (b *Bus) SetDefaultSubscriptionQueueSize(i uint64)
- func (b *Bus) SetDrainChanDuration(duration time.Duration)
- func (b *Bus) SetPanicOnFullQueue(graceTime time.Duration)
- func (b *Bus) Start(ctx context.Context)
- func (b *Bus) Stop()
- func (b *Bus) Sub(name string, options ...interface{}) *Subscription
- type ErrSubscriptionIDNotFound
- type Label
- type Labels
- type Messager
- type Msg
- type QueueSizer
- type Subscription
- type SubscriptionError
- type SubscriptionQueueThreshold
- type Timeout
- type Timeouter
- type WithQueueSize
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ContextWithBus ¶
ContextWithBus stores the bus in the context and returns the new context.
Types ¶
type Bus ¶
func BusFromContext ¶
func (*Bus) DisableBufferPublication ¶
func (b *Bus) DisableBufferPublication()
DisableBufferPublication disable the publication buffering. It dequeues the publication buffer channel for retransmission. publication buffer channel is then closed and the new publications are immediately delivered pubsub default behavior is unbuffered.
func (*Bus) EnableBufferPublication ¶
EnableBufferPublication enable the publication buffering. The future publication commands are push to a fresh buffered channel of cmdPub with cap capacity, instead of being delivered immediately.
pubsub default behavior is unbuffered.
func (*Bus) Pub ¶
Pub posts a new Publication on the bus. The labels are added to existing v labels, so a subscriber can retrieve message publication labels from the received message.
func (*Bus) SetDefaultSubscriptionQueueSize ¶
SetDefaultSubscriptionQueueSize overrides the default queue size of subscribers for not yet started bus.
It panics if called on started bus.
func (*Bus) SetDrainChanDuration ¶
SetDrainChanDuration overrides defaultDrainChanDuration for not yet started bus.
It panics if called on started bus.
func (*Bus) SetPanicOnFullQueue ¶
SetPanicOnFullQueue enable panic after grace time on subscriptions with no timeout has reached subscription maximum queue size. Zero graceTime disable panic on full queue feature.
It panics if called on started bus.
func (*Bus) Sub ¶
func (b *Bus) Sub(name string, options ...interface{}) *Subscription
Sub function requires a new Subscription "name" on the bus.
The not empty string <name> parameter is used to compute the subscription family (the fist field of name), example: with name "daemon.imon foo@node1", family is "daemon.imon". Function will panic if name is empty.
Used options: Timeouter, QueueSizer
when Timeouter, it sets the subscriber timeout to pull each message, subscriber with exceeded timeout notification are automatically dropped, and SubscriptionError message is sent on bus. defaults is no timeout
when QueueSizer, it sets the subscriber queue size. default value is bus dependent (see SetDefaultSubscriptionQueueSize)
type ErrSubscriptionIDNotFound ¶
type ErrSubscriptionIDNotFound struct {
// contains filtered or unexported fields
}
func (ErrSubscriptionIDNotFound) Error ¶
func (e ErrSubscriptionIDNotFound) Error() string
type Labels ¶
Labels allow message routing filtering based on key/value matching
func (Labels) Keys ¶
Keys returns all the permutations of all lengths of the labels ex:
keys of l1=foo l2=foo l3=foo: {l1=foo} {l2=foo} {l3=foo} {l1=foo}{l2=foo} {l1=foo}{l3=foo} {l2=foo}{l3=foo} {l2=foo}{l1=foo} {l3=foo}{l1=foo} {l3=foo}{l2=foo} {l1=foo}{l2=foo}{l3=foo} {l1=foo}{l3=foo}{l2=foo} {l2=foo}{l1=foo}{l3=foo} {l2=foo}{l3=foo}{l1=foo} {l3=foo}{l1=foo}{l2=foo} {l3=foo}{l2=foo}{l1=foo}
type QueueSizer ¶
type QueueSizer interface {
// contains filtered or unexported methods
}
type Subscription ¶
type Subscription struct { // C is the channel exposed to the subscriber for polling C chan any // contains filtered or unexported fields }
func (*Subscription) AddFilter ¶
func (sub *Subscription) AddFilter(v any, labels ...Label)
func (*Subscription) DelFilter ¶
func (sub *Subscription) DelFilter(v any, labels ...Label)
func (*Subscription) Drain ¶
func (sub *Subscription) Drain()
Drain dequeues exposed channel.
Drain is automatically called during sub.Stop()
func (*Subscription) Start ¶
func (sub *Subscription) Start()
func (*Subscription) Stop ¶
func (sub *Subscription) Stop() error
Stop closes the subscription and deueues private and exposed subscription channels
func (*Subscription) String ¶
func (sub *Subscription) String() string
type SubscriptionError ¶
type SubscriptionError struct { Msg ID uuid.UUID `json:"id"` Name string `json:"name"` ErrS string `json:"error"` }
SubscriptionError is an emitted publication made when a subscriber notification exceeds its timeout
func (SubscriptionError) Kind ¶
func (m SubscriptionError) Kind() string
type SubscriptionQueueThreshold ¶
type SubscriptionQueueThreshold struct { Msg ID uuid.UUID Name string `json:"name"` // Count is the current used slots in internal subscriber queue Count uint64 `json:"count"` // From is the previous high threshold value From uint64 `json:"from"` // To is the new high threshold value To uint64 `json:"to"` // Limit is the maximum queue size Limit uint64 `json:"limit"` }
SubscriptionQueueThreshold is an emitted publication made when a subscriber queue reach/leave its current high threshold value
func (SubscriptionQueueThreshold) Kind ¶
func (m SubscriptionQueueThreshold) Kind() string
type WithQueueSize ¶
type WithQueueSize uint64