broker

package
v0.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2018 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package broker implements types used for routing inputs to outputs in non-trivial arrangements, such as fan-out or fan-in models.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OptDynamicFanInSetOnAdd

func OptDynamicFanInSetOnAdd(onAddFunc func(label string)) func(*DynamicFanIn)

OptDynamicFanInSetOnAdd sets the function that is called whenever a dynamic input is added.

func OptDynamicFanInSetOnRemove

func OptDynamicFanInSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanIn)

OptDynamicFanInSetOnRemove sets the function that is called whenever a dynamic input is removed.

func OptDynamicFanOutSetOnAdd

func OptDynamicFanOutSetOnAdd(onAddFunc func(label string)) func(*DynamicFanOut)

OptDynamicFanOutSetOnAdd sets the function that is called whenever a dynamic output is added.

func OptDynamicFanOutSetOnRemove

func OptDynamicFanOutSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanOut)

OptDynamicFanOutSetOnRemove sets the function that is called whenever a dynamic output is removed.

Types

type DynamicFanIn

type DynamicFanIn struct {
	// contains filtered or unexported fields
}

DynamicFanIn is a broker that implements types.Producer and manages a map of inputs to unique string identifiers, routing them through a single message channel. Inputs can be added and removed dynamically as the broker runs.

func NewDynamicFanIn

func NewDynamicFanIn(
	inputs map[string]DynamicInput,
	logger log.Modular,
	stats metrics.Type,
	options ...func(*DynamicFanIn),
) (*DynamicFanIn, error)

NewDynamicFanIn creates a new DynamicFanIn type by providing an initial map map of inputs.

func (*DynamicFanIn) CloseAsync

func (d *DynamicFanIn) CloseAsync()

CloseAsync shuts down the DynamicFanIn broker and stops processing requests.

func (*DynamicFanIn) SetInput

func (d *DynamicFanIn) SetInput(ident string, input DynamicInput, timeout time.Duration) error

SetInput attempts to add a new input to the dynamic input broker. If an input already exists with the same identifier it will be closed and removed. If either action takes longer than the timeout period an error will be returned.

A nil input is safe and will simply remove the previous input under the indentifier, if there was one.

func (*DynamicFanIn) TransactionChan

func (d *DynamicFanIn) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this broker.

func (*DynamicFanIn) WaitForClose

func (d *DynamicFanIn) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the DynamicFanIn broker has closed down.

type DynamicFanOut

type DynamicFanOut struct {
	// contains filtered or unexported fields
}

DynamicFanOut is a broker that implements types.Consumer and broadcasts each message out to a dynamic map of outputs.

func NewDynamicFanOut

func NewDynamicFanOut(
	outputs map[string]DynamicOutput,
	logger log.Modular,
	stats metrics.Type,
	options ...func(*DynamicFanOut),
) (*DynamicFanOut, error)

NewDynamicFanOut creates a new DynamicFanOut type by providing outputs.

func (*DynamicFanOut) CloseAsync

func (d *DynamicFanOut) CloseAsync()

CloseAsync shuts down the DynamicFanOut broker and stops processing requests.

func (*DynamicFanOut) SetOutput

func (d *DynamicFanOut) SetOutput(ident string, output DynamicOutput, timeout time.Duration) error

SetOutput attempts to add a new output to the dynamic output broker. If an output already exists with the same identifier it will be closed and removed. If either action takes longer than the timeout period an error will be returned.

A nil output argument is safe and will simply remove the previous output under the indentifier, if there was one.

func (*DynamicFanOut) StartReceiving

func (d *DynamicFanOut) StartReceiving(transactions <-chan types.Transaction) error

StartReceiving assigns a new transactions channel for the broker to read.

func (*DynamicFanOut) WaitForClose

func (d *DynamicFanOut) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the DynamicFanOut broker has closed down.

type DynamicInput

type DynamicInput interface {
	types.Transactor
	types.Closable
}

DynamicInput is an interface of input types that must be closable.

type DynamicOutput

type DynamicOutput interface {
	types.TransactionReceiver
	types.Closable
}

DynamicOutput is an interface of output types that must be closable.

type FanIn

type FanIn struct {
	// contains filtered or unexported fields
}

FanIn is a broker that implements types.Producer, takes an array of inputs and routes them through a single message channel.

func NewFanIn

func NewFanIn(inputs []types.Producer, stats metrics.Type) (*FanIn, error)

NewFanIn creates a new FanIn type by providing inputs.

func (*FanIn) CloseAsync

func (i *FanIn) CloseAsync()

CloseAsync shuts down the FanIn broker and stops processing requests.

func (*FanIn) TransactionChan

func (i *FanIn) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming transactions from this broker.

func (*FanIn) WaitForClose

func (i *FanIn) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the FanIn broker has closed down.

type FanOut

type FanOut struct {
	// contains filtered or unexported fields
}

FanOut is a broker that implements types.Consumer and broadcasts each message out to an array of outputs.

func NewFanOut

func NewFanOut(
	outputs []types.Output, logger log.Modular, stats metrics.Type,
) (*FanOut, error)

NewFanOut creates a new FanOut type by providing outputs.

func (*FanOut) CloseAsync

func (o *FanOut) CloseAsync()

CloseAsync shuts down the FanOut broker and stops processing requests.

func (*FanOut) StartReceiving

func (o *FanOut) StartReceiving(transactions <-chan types.Transaction) error

StartReceiving assigns a new transactions channel for the broker to read.

func (*FanOut) WaitForClose

func (o *FanOut) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the FanOut broker has closed down.

type Greedy

type Greedy struct {
	// contains filtered or unexported fields
}

Greedy is a broker that implements types.Consumer and sends each message out to a single consumer chosen from an array in round-robin fashion. Consumers that apply backpressure will block all consumers.

func NewGreedy

func NewGreedy(outputs []types.Output) (*Greedy, error)

NewGreedy creates a new Greedy type by providing consumers.

func (*Greedy) CloseAsync

func (g *Greedy) CloseAsync()

CloseAsync shuts down the Greedy broker and stops processing requests.

func (*Greedy) StartReceiving

func (g *Greedy) StartReceiving(ts <-chan types.Transaction) error

StartReceiving assigns a new messages channel for the broker to read.

func (*Greedy) WaitForClose

func (g *Greedy) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Greedy broker has closed down.

type MockType

type MockType struct {
}

MockType implements the broker.Type interface.

func (MockType) CloseAsync

func (m MockType) CloseAsync()

CloseAsync does nothing.

func (MockType) WaitForClose

func (m MockType) WaitForClose(time.Duration) error

WaitForClose does nothing.

type RoundRobin

type RoundRobin struct {
	// contains filtered or unexported fields
}

RoundRobin is a broker that implements types.Consumer and sends each message out to a single consumer chosen from an array in round-robin fashion. Consumers that apply backpressure will block all consumers.

func NewRoundRobin

func NewRoundRobin(outputs []types.Output, stats metrics.Type) (*RoundRobin, error)

NewRoundRobin creates a new RoundRobin type by providing consumers.

func (*RoundRobin) CloseAsync

func (o *RoundRobin) CloseAsync()

CloseAsync shuts down the RoundRobin broker and stops processing requests.

func (*RoundRobin) StartReceiving

func (o *RoundRobin) StartReceiving(ts <-chan types.Transaction) error

StartReceiving assigns a new messages channel for the broker to read.

func (*RoundRobin) WaitForClose

func (o *RoundRobin) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RoundRobin broker has closed down.

type Try

type Try struct {
	// contains filtered or unexported fields
}

Try is a broker that implements types.Consumer and attempts to send each message to a single output, but on failure will attempt the next output in the list.

func NewTry

func NewTry(outputs []types.Output, stats metrics.Type) (*Try, error)

NewTry creates a new Try type by providing consumers.

func (*Try) CloseAsync

func (t *Try) CloseAsync()

CloseAsync shuts down the Try broker and stops processing requests.

func (*Try) StartReceiving

func (t *Try) StartReceiving(ts <-chan types.Transaction) error

StartReceiving assigns a new messages channel for the broker to read.

func (*Try) WaitForClose

func (t *Try) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Try broker has closed down.

type Type

type Type interface {
	types.Closable
}

Type is the standard interface of a broker type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL