Documentation ¶
Overview ¶
Package broker implements types used for routing inputs to outputs in non-trivial arrangements, such as fan-out or fan-in models.
Index ¶
- func ComplementGenericConfig(target, complement interface{}) error
- func GetGenericType(boxedConfig interface{}) string
- func OptDynamicFanInSetOnAdd(onAddFunc func(label string)) func(*DynamicFanIn)
- func OptDynamicFanInSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanIn)
- func OptDynamicFanOutSetOnAdd(onAddFunc func(label string)) func(*DynamicFanOut)
- func OptDynamicFanOutSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanOut)
- func RemoveGenericType(boxedConfig interface{})
- type DynamicFanIn
- func (d *DynamicFanIn) CloseAsync()
- func (d *DynamicFanIn) Connected() bool
- func (d *DynamicFanIn) SetInput(ident string, input DynamicInput, timeout time.Duration) error
- func (d *DynamicFanIn) TransactionChan() <-chan types.Transaction
- func (d *DynamicFanIn) WaitForClose(timeout time.Duration) error
- type DynamicFanOut
- func (d *DynamicFanOut) CloseAsync()
- func (d *DynamicFanOut) Connected() bool
- func (d *DynamicFanOut) Consume(transactions <-chan types.Transaction) error
- func (d *DynamicFanOut) MaxInFlight() (int, bool)
- func (d *DynamicFanOut) SetOutput(ident string, output DynamicOutput, timeout time.Duration) error
- func (d *DynamicFanOut) WaitForClose(timeout time.Duration) error
- func (d *DynamicFanOut) WithMaxInFlight(i int) *DynamicFanOut
- type DynamicInput
- type DynamicOutput
- type FanIn
- type FanOut
- type FanOutSequential
- func (o *FanOutSequential) CloseAsync()
- func (o *FanOutSequential) Connected() bool
- func (o *FanOutSequential) Consume(transactions <-chan types.Transaction) error
- func (o *FanOutSequential) MaxInFlight() (int, bool)
- func (o *FanOutSequential) WaitForClose(timeout time.Duration) error
- func (o *FanOutSequential) WithMaxInFlight(i int) *FanOutSequential
- type Greedy
- type MockType
- type RoundRobin
- type Try
- func (t *Try) CloseAsync()
- func (t *Try) Connected() bool
- func (t *Try) Consume(ts <-chan types.Transaction) error
- func (t *Try) MaxInFlight() (int, bool)
- func (t *Try) WaitForClose(timeout time.Duration) error
- func (t *Try) WithMaxInFlight(i int) *Try
- func (t *Try) WithOutputMetricsPrefix(prefix string) *Try
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComplementGenericConfig ¶
func ComplementGenericConfig(target, complement interface{}) error
ComplementGenericConfig copies fields from one generic config to another, but avoids overriding existing values in the destination config.
func GetGenericType ¶
func GetGenericType(boxedConfig interface{}) string
GetGenericType returns the type of a generically parsed config structure.
func OptDynamicFanInSetOnAdd ¶
func OptDynamicFanInSetOnAdd(onAddFunc func(label string)) func(*DynamicFanIn)
OptDynamicFanInSetOnAdd sets the function that is called whenever a dynamic input is added.
func OptDynamicFanInSetOnRemove ¶
func OptDynamicFanInSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanIn)
OptDynamicFanInSetOnRemove sets the function that is called whenever a dynamic input is removed.
func OptDynamicFanOutSetOnAdd ¶
func OptDynamicFanOutSetOnAdd(onAddFunc func(label string)) func(*DynamicFanOut)
OptDynamicFanOutSetOnAdd sets the function that is called whenever a dynamic output is added.
func OptDynamicFanOutSetOnRemove ¶
func OptDynamicFanOutSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanOut)
OptDynamicFanOutSetOnRemove sets the function that is called whenever a dynamic output is removed.
func RemoveGenericType ¶
func RemoveGenericType(boxedConfig interface{})
RemoveGenericType removes the type of a generically parsed config structure.
Types ¶
type DynamicFanIn ¶
type DynamicFanIn struct {
// contains filtered or unexported fields
}
DynamicFanIn is a broker that implements types.Producer and manages a map of inputs to unique string identifiers, routing them through a single message channel. Inputs can be added and removed dynamically as the broker runs.
func NewDynamicFanIn ¶
func NewDynamicFanIn( inputs map[string]DynamicInput, logger log.Modular, stats metrics.Type, options ...func(*DynamicFanIn), ) (*DynamicFanIn, error)
NewDynamicFanIn creates a new DynamicFanIn type by providing an initial map map of inputs.
func (*DynamicFanIn) CloseAsync ¶
func (d *DynamicFanIn) CloseAsync()
CloseAsync shuts down the DynamicFanIn broker and stops processing requests.
func (*DynamicFanIn) Connected ¶
func (d *DynamicFanIn) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*DynamicFanIn) SetInput ¶
func (d *DynamicFanIn) SetInput(ident string, input DynamicInput, timeout time.Duration) error
SetInput attempts to add a new input to the dynamic input broker. If an input already exists with the same identifier it will be closed and removed. If either action takes longer than the timeout period an error will be returned.
A nil input is safe and will simply remove the previous input under the indentifier, if there was one.
func (*DynamicFanIn) TransactionChan ¶
func (d *DynamicFanIn) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this broker.
func (*DynamicFanIn) WaitForClose ¶
func (d *DynamicFanIn) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the DynamicFanIn broker has closed down.
type DynamicFanOut ¶
type DynamicFanOut struct {
// contains filtered or unexported fields
}
DynamicFanOut is a broker that implements types.Consumer and broadcasts each message out to a dynamic map of outputs.
func NewDynamicFanOut ¶
func NewDynamicFanOut( outputs map[string]DynamicOutput, logger log.Modular, stats metrics.Type, options ...func(*DynamicFanOut), ) (*DynamicFanOut, error)
NewDynamicFanOut creates a new DynamicFanOut type by providing outputs.
func (*DynamicFanOut) CloseAsync ¶
func (d *DynamicFanOut) CloseAsync()
CloseAsync shuts down the DynamicFanOut broker and stops processing requests.
func (*DynamicFanOut) Connected ¶
func (d *DynamicFanOut) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*DynamicFanOut) Consume ¶
func (d *DynamicFanOut) Consume(transactions <-chan types.Transaction) error
Consume assigns a new transactions channel for the broker to read.
func (*DynamicFanOut) MaxInFlight ¶ added in v3.45.1
func (d *DynamicFanOut) MaxInFlight() (int, bool)
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
func (*DynamicFanOut) SetOutput ¶
func (d *DynamicFanOut) SetOutput(ident string, output DynamicOutput, timeout time.Duration) error
SetOutput attempts to add a new output to the dynamic output broker. If an output already exists with the same identifier it will be closed and removed. If either action takes longer than the timeout period an error will be returned.
A nil output argument is safe and will simply remove the previous output under the indentifier, if there was one.
TODO: V4 use context here instead.
func (*DynamicFanOut) WaitForClose ¶
func (d *DynamicFanOut) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the DynamicFanOut broker has closed down.
func (*DynamicFanOut) WithMaxInFlight ¶ added in v3.26.0
func (d *DynamicFanOut) WithMaxInFlight(i int) *DynamicFanOut
WithMaxInFlight sets the maximum number of in-flight messages this broker supports. This must be set before calling Consume.
type DynamicInput ¶
DynamicInput is an interface of input types that must be closable.
type DynamicOutput ¶
DynamicOutput is an interface of output types that must be closable.
type FanIn ¶
type FanIn struct {
// contains filtered or unexported fields
}
FanIn is a broker that implements types.Producer, takes an array of inputs and routes them through a single message channel.
func (*FanIn) CloseAsync ¶
func (i *FanIn) CloseAsync()
CloseAsync shuts down the FanIn broker and stops processing requests.
func (*FanIn) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*FanIn) TransactionChan ¶
func (i *FanIn) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming transactions from this broker.
type FanOut ¶
type FanOut struct {
// contains filtered or unexported fields
}
FanOut is a broker that implements types.Consumer and broadcasts each message out to an array of outputs.
func (*FanOut) CloseAsync ¶
func (o *FanOut) CloseAsync()
CloseAsync shuts down the FanOut broker and stops processing requests.
func (*FanOut) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*FanOut) Consume ¶
func (o *FanOut) Consume(transactions <-chan types.Transaction) error
Consume assigns a new transactions channel for the broker to read.
func (*FanOut) MaxInFlight ¶ added in v3.45.1
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
func (*FanOut) WaitForClose ¶
WaitForClose blocks until the FanOut broker has closed down.
func (*FanOut) WithMaxInFlight ¶ added in v3.12.0
WithMaxInFlight sets the maximum number of in-flight messages this broker supports. This must be set before calling Consume.
type FanOutSequential ¶
type FanOutSequential struct {
// contains filtered or unexported fields
}
FanOutSequential is a broker that implements types.Consumer and broadcasts each message out to an array of outputs, but does so sequentially, only proceeding onto an output when the preceding output has successfully reported message receipt.
func NewFanOutSequential ¶
func NewFanOutSequential( outputs []types.Output, logger log.Modular, stats metrics.Type, ) (*FanOutSequential, error)
NewFanOutSequential creates a new FanOutSequential type by providing outputs.
func (*FanOutSequential) CloseAsync ¶
func (o *FanOutSequential) CloseAsync()
CloseAsync shuts down the FanOutSequential broker and stops processing requests.
func (*FanOutSequential) Connected ¶
func (o *FanOutSequential) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*FanOutSequential) Consume ¶
func (o *FanOutSequential) Consume(transactions <-chan types.Transaction) error
Consume assigns a new transactions channel for the broker to read.
func (*FanOutSequential) MaxInFlight ¶ added in v3.45.1
func (o *FanOutSequential) MaxInFlight() (int, bool)
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
func (*FanOutSequential) WaitForClose ¶
func (o *FanOutSequential) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the FanOutSequential broker has closed down.
func (*FanOutSequential) WithMaxInFlight ¶ added in v3.12.0
func (o *FanOutSequential) WithMaxInFlight(i int) *FanOutSequential
WithMaxInFlight sets the maximum number of in-flight messages this broker supports. This must be set before calling Consume.
type Greedy ¶
type Greedy struct {
// contains filtered or unexported fields
}
Greedy is a broker that implements types.Consumer and sends each message out to a single consumer chosen from an array in round-robin fashion. Consumers that apply backpressure will block all consumers.
func (*Greedy) CloseAsync ¶
func (g *Greedy) CloseAsync()
CloseAsync shuts down the Greedy broker and stops processing requests.
func (*Greedy) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*Greedy) Consume ¶
func (g *Greedy) Consume(ts <-chan types.Transaction) error
Consume assigns a new messages channel for the broker to read.
func (*Greedy) MaxInFlight ¶ added in v3.45.1
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
type RoundRobin ¶
type RoundRobin struct {
// contains filtered or unexported fields
}
RoundRobin is a broker that implements types.Consumer and sends each message out to a single consumer chosen from an array in round-robin fashion. Consumers that apply backpressure will block all consumers.
func NewRoundRobin ¶
NewRoundRobin creates a new RoundRobin type by providing consumers.
func (*RoundRobin) CloseAsync ¶
func (o *RoundRobin) CloseAsync()
CloseAsync shuts down the RoundRobin broker and stops processing requests.
func (*RoundRobin) Connected ¶
func (o *RoundRobin) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*RoundRobin) Consume ¶
func (o *RoundRobin) Consume(ts <-chan types.Transaction) error
Consume assigns a new messages channel for the broker to read.
func (*RoundRobin) MaxInFlight ¶ added in v3.45.1
func (o *RoundRobin) MaxInFlight() (m int, ok bool)
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
func (*RoundRobin) WaitForClose ¶
func (o *RoundRobin) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the RoundRobin broker has closed down.
type Try ¶
type Try struct {
// contains filtered or unexported fields
}
Try is a broker that implements types.Consumer and attempts to send each message to a single output, but on failure will attempt the next output in the list.
func (*Try) CloseAsync ¶
func (t *Try) CloseAsync()
CloseAsync shuts down the Try broker and stops processing requests.
func (*Try) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*Try) Consume ¶
func (t *Try) Consume(ts <-chan types.Transaction) error
Consume assigns a new messages channel for the broker to read.
func (*Try) MaxInFlight ¶ added in v3.45.1
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
func (*Try) WaitForClose ¶
WaitForClose blocks until the Try broker has closed down.
func (*Try) WithMaxInFlight ¶ added in v3.12.0
WithMaxInFlight sets the maximum number of in-flight messages this broker supports. This must be set before calling Consume.
func (*Try) WithOutputMetricsPrefix ¶ added in v3.7.0
WithOutputMetricsPrefix changes the prefix used for counter metrics showing errors of an output.