Documentation ¶
Overview ¶
Package broker implements types used for routing inputs to outputs in non-trivial arrangements, such as fan-out or fan-in models.
Index ¶
- func OptDynamicFanInSetOnAdd(onAddFunc func(label string)) func(*DynamicFanIn)
- func OptDynamicFanInSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanIn)
- func OptDynamicFanOutSetOnAdd(onAddFunc func(label string)) func(*DynamicFanOut)
- func OptDynamicFanOutSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanOut)
- type DynamicFanIn
- func (d *DynamicFanIn) CloseAsync()
- func (d *DynamicFanIn) MessageChan() <-chan types.Message
- func (d *DynamicFanIn) SetInput(ident string, input DynamicInput, timeout time.Duration) error
- func (d *DynamicFanIn) StartListening(responseChan <-chan types.Response) error
- func (d *DynamicFanIn) WaitForClose(timeout time.Duration) error
- type DynamicFanOut
- func (d *DynamicFanOut) CloseAsync()
- func (d *DynamicFanOut) ResponseChan() <-chan types.Response
- func (d *DynamicFanOut) SetOutput(ident string, output DynamicOutput, timeout time.Duration) error
- func (d *DynamicFanOut) StartReceiving(msgs <-chan types.Message) error
- func (d *DynamicFanOut) WaitForClose(timeout time.Duration) error
- type DynamicInput
- type DynamicOutput
- type FanIn
- type FanOut
- type FanOutConfig
- type MockType
- type RoundRobin
- type Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OptDynamicFanInSetOnAdd ¶ added in v0.8.0
func OptDynamicFanInSetOnAdd(onAddFunc func(label string)) func(*DynamicFanIn)
OptDynamicFanInSetOnAdd sets the function that is called whenever a dynamic input is added.
func OptDynamicFanInSetOnRemove ¶ added in v0.8.0
func OptDynamicFanInSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanIn)
OptDynamicFanInSetOnRemove sets the function that is called whenever a dynamic input is removed.
func OptDynamicFanOutSetOnAdd ¶ added in v0.8.0
func OptDynamicFanOutSetOnAdd(onAddFunc func(label string)) func(*DynamicFanOut)
OptDynamicFanOutSetOnAdd sets the function that is called whenever a dynamic output is added.
func OptDynamicFanOutSetOnRemove ¶ added in v0.8.0
func OptDynamicFanOutSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanOut)
OptDynamicFanOutSetOnRemove sets the function that is called whenever a dynamic output is removed.
Types ¶
type DynamicFanIn ¶ added in v0.8.0
type DynamicFanIn struct {
// contains filtered or unexported fields
}
DynamicFanIn is a broker that implements types.Producer and manages a map of inputs to unique string identifiers, routing them through a single message channel. Inputs can be added and removed dynamically as the broker runs.
func NewDynamicFanIn ¶ added in v0.8.0
func NewDynamicFanIn( inputs map[string]DynamicInput, logger log.Modular, stats metrics.Type, options ...func(*DynamicFanIn), ) (*DynamicFanIn, error)
NewDynamicFanIn creates a new DynamicFanIn type by providing an initial map map of inputs.
func (*DynamicFanIn) CloseAsync ¶ added in v0.8.0
func (d *DynamicFanIn) CloseAsync()
CloseAsync shuts down the DynamicFanIn broker and stops processing requests.
func (*DynamicFanIn) MessageChan ¶ added in v0.8.0
func (d *DynamicFanIn) MessageChan() <-chan types.Message
MessageChan returns the channel used for consuming messages from this broker.
func (*DynamicFanIn) SetInput ¶ added in v0.8.0
func (d *DynamicFanIn) SetInput(ident string, input DynamicInput, timeout time.Duration) error
SetInput attempts to add a new input to the dynamic input broker. If an input already exists with the same identifier it will be closed and removed. If either action takes longer than the timeout period an error will be returned.
A nil input is safe and will simply remove the previous input under the indentifier, if there was one.
func (*DynamicFanIn) StartListening ¶ added in v0.8.0
func (d *DynamicFanIn) StartListening(responseChan <-chan types.Response) error
StartListening assigns a new responses channel for the broker to read.
func (*DynamicFanIn) WaitForClose ¶ added in v0.8.0
func (d *DynamicFanIn) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the DynamicFanIn broker has closed down.
type DynamicFanOut ¶ added in v0.8.0
type DynamicFanOut struct {
// contains filtered or unexported fields
}
DynamicFanOut is a broker that implements types.Consumer and broadcasts each message out to a dynamic map of outputs.
func NewDynamicFanOut ¶ added in v0.8.0
func NewDynamicFanOut( outputs map[string]DynamicOutput, logger log.Modular, stats metrics.Type, options ...func(*DynamicFanOut), ) (*DynamicFanOut, error)
NewDynamicFanOut creates a new DynamicFanOut type by providing outputs.
func (*DynamicFanOut) CloseAsync ¶ added in v0.8.0
func (d *DynamicFanOut) CloseAsync()
CloseAsync shuts down the DynamicFanOut broker and stops processing requests.
func (*DynamicFanOut) ResponseChan ¶ added in v0.8.0
func (d *DynamicFanOut) ResponseChan() <-chan types.Response
ResponseChan returns the response channel.
func (*DynamicFanOut) SetOutput ¶ added in v0.8.0
func (d *DynamicFanOut) SetOutput(ident string, output DynamicOutput, timeout time.Duration) error
SetOutput attempts to add a new output to the dynamic output broker. If an output already exists with the same identifier it will be closed and removed. If either action takes longer than the timeout period an error will be returned.
A nil output argument is safe and will simply remove the previous output under the indentifier, if there was one.
func (*DynamicFanOut) StartReceiving ¶ added in v0.8.0
func (d *DynamicFanOut) StartReceiving(msgs <-chan types.Message) error
StartReceiving assigns a new messages channel for the broker to read.
func (*DynamicFanOut) WaitForClose ¶ added in v0.8.0
func (d *DynamicFanOut) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the DynamicFanOut broker has closed down.
type DynamicInput ¶ added in v0.8.0
type DynamicInput interface { types.MessageSender types.ResponderListener types.Closable }
DynamicInput is an interface of input types that must be closable.
type DynamicOutput ¶ added in v0.8.0
DynamicOutput is an interface of output types that must be closable.
type FanIn ¶
type FanIn struct {
// contains filtered or unexported fields
}
FanIn is a broker that implements types.Producer, takes an array of inputs and routes them through a single message channel.
func (*FanIn) CloseAsync ¶
func (i *FanIn) CloseAsync()
CloseAsync shuts down the FanIn broker and stops processing requests.
func (*FanIn) MessageChan ¶
MessageChan returns the channel used for consuming messages from this broker.
func (*FanIn) StartListening ¶
StartListening assigns a new responses channel for the broker to read.
type FanOut ¶
type FanOut struct {
// contains filtered or unexported fields
}
FanOut is a broker that implements types.Consumer and broadcasts each message out to an array of outputs.
func NewFanOut ¶
func NewFanOut( conf FanOutConfig, outputs []types.Consumer, logger log.Modular, stats metrics.Type, ) (*FanOut, error)
NewFanOut creates a new FanOut type by providing outputs.
func (*FanOut) CloseAsync ¶
func (o *FanOut) CloseAsync()
CloseAsync shuts down the FanOut broker and stops processing requests.
func (*FanOut) ResponseChan ¶
ResponseChan returns the response channel.
func (*FanOut) StartReceiving ¶
StartReceiving assigns a new messages channel for the broker to read.
type FanOutConfig ¶ added in v0.1.1
type FanOutConfig struct { }
FanOutConfig is config values for the fan out type.
func NewFanOutConfig ¶ added in v0.1.1
func NewFanOutConfig() FanOutConfig
NewFanOutConfig creates a FanOutConfig fully populated with default values.
type RoundRobin ¶
type RoundRobin struct {
// contains filtered or unexported fields
}
RoundRobin is a broker that implements types.Consumer and sends each message out to a single consumer chosen from an array in round-robin fashion. Consumers that apply backpressure will block all consumers.
func NewRoundRobin ¶
NewRoundRobin creates a new RoundRobin type by providing consumers.
func (*RoundRobin) CloseAsync ¶
func (o *RoundRobin) CloseAsync()
CloseAsync shuts down the RoundRobin broker and stops processing requests.
func (*RoundRobin) ResponseChan ¶
func (o *RoundRobin) ResponseChan() <-chan types.Response
ResponseChan returns the response channel.
func (*RoundRobin) StartReceiving ¶
func (o *RoundRobin) StartReceiving(msgs <-chan types.Message) error
StartReceiving assigns a new messages channel for the broker to read.
func (*RoundRobin) WaitForClose ¶
func (o *RoundRobin) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the RoundRobin broker has closed down.