Documentation ¶
Index ¶
- Variables
- func WithCloseOutChan() func(*Queue) error
- func WithLimit(limit uint64) func(*Queue) error
- func WithTimeout(timeout time.Duration) func(*Queue) error
- type ChannelSinkGenerator
- type Queue
- func (q *Queue) CallbackWatch(matcher events.Matcher) (eventq chan events.Event, cancel func())
- func (q *Queue) CallbackWatchContext(ctx context.Context, matcher events.Matcher) (eventq chan events.Event)
- func (q *Queue) Close() error
- func (q *Queue) Publish(item events.Event)
- func (q *Queue) Watch() (eventq chan events.Event, cancel func())
- func (q *Queue) WatchContext(ctx context.Context) (eventq chan events.Event)
- type TimeoutDropErrChanGen
Constants ¶
This section is empty.
Variables ¶
var ErrSinkTimeout = fmt.Errorf("timeout exceeded, tearing down sink")
ErrSinkTimeout is returned from the Write method when a sink times out.
Functions ¶
func WithCloseOutChan ¶
WithCloseOutChan returns a functional option for a queue whose watcher channel is closed when no more events are expected to be sent to the watcher.
Types ¶
type ChannelSinkGenerator ¶
type ChannelSinkGenerator interface {
NewChannelSink() (events.Sink, *events.Channel)
}
ChannelSinkGenerator is a constructor of sinks that eventually lead to a channel.
func NewTimeoutDropErrSinkGen ¶
func NewTimeoutDropErrSinkGen(timeout time.Duration) ChannelSinkGenerator
NewTimeoutDropErrSinkGen returns a generator of timeoutSinks wrapping dropErrClosed sinks, wrapping unbuffered channel sinks.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the structure used to publish events and watch for them.
func NewQueue ¶
NewQueue creates a new publish/subscribe queue which supports watchers. The channels that it will create for subscriptions will have the buffer size specified by buffer.
func (*Queue) CallbackWatch ¶
func (q *Queue) CallbackWatch(matcher events.Matcher) (eventq chan events.Event, cancel func())
CallbackWatch returns a channel which will receive all events published to the queue from this point that pass the check in the provided callback function. The returned cancel function will stop the flow of events and close the channel.
func (*Queue) CallbackWatchContext ¶
func (q *Queue) CallbackWatchContext(ctx context.Context, matcher events.Matcher) (eventq chan events.Event)
CallbackWatchContext returns a channel where all items published to the queue will be received. The channel will be closed when the provided context is cancelled.
func (*Queue) Publish ¶
func (q *Queue) Publish(item events.Event)
Publish adds an item to the queue.
func (*Queue) Watch ¶
func (q *Queue) Watch() (eventq chan events.Event, cancel func())
Watch returns a channel which will receive all items published to the queue from this point, until cancel is called.
func (*Queue) WatchContext ¶
WatchContext returns a channel where all items published to the queue will be received. The channel will be closed when the provided context is cancelled.
type TimeoutDropErrChanGen ¶
type TimeoutDropErrChanGen struct {
// contains filtered or unexported fields
}
TimeoutDropErrChanGen is a ChannelSinkGenerator that creates a channel, wrapped by the dropErrClosed sink and a timeout.
func (*TimeoutDropErrChanGen) NewChannelSink ¶
func (s *TimeoutDropErrChanGen) NewChannelSink() (events.Sink, *events.Channel)
NewChannelSink creates a new sink chain of timeoutSink->dropErrClosed->Channel