Documentation ¶
Overview ¶
Package types defines any general structs and interfaces used throughout the benthos code base.
Benthos uses abstract types to represent arbitrary producers and consumers of data to its core components. This allows us to construct types for piping data in various arrangements without regard for the specific destinations and sources of our data.
The basic principle behind a producer/consumer relationship is that a producer pipes data to the consumer in lock-step, where for each message sent it will expect a response that confirms the message was received and propagated onwards.
Messages and responses are sent via channels, and in order to instigate this pairing each type is expected to create and maintain ownership of its respective sending channel.
Index ¶
- Variables
- type Cache
- type Closable
- type Condition
- type Consumer
- type DudMgr
- func (f DudMgr) GetCache(name string) (Cache, error)
- func (f DudMgr) GetCondition(name string) (Condition, error)
- func (f DudMgr) GetPipe(name string) (<-chan Transaction, error)
- func (f DudMgr) RegisterEndpoint(path, desc string, h http.HandlerFunc)
- func (f DudMgr) SetPipe(name string, t <-chan Transaction)
- func (f DudMgr) UnsetPipe(name string, t <-chan Transaction)
- type ErrUnexpectedHTTPRes
- type HTTPMessage
- type HTTPResponse
- type Input
- type Manager
- type Message
- type Output
- type Producer
- type Response
- type SimpleResponse
- type Transaction
- type UnacknowledgedResponse
Constants ¶
This section is empty.
Variables ¶
var ( ErrTimeout = errors.New("action timed out") ErrChanClosed = errors.New("channel was closed unexpectedly") ErrTypeClosed = errors.New("type was closed") ErrNotConnected = errors.New("not connected to target source or sink") ErrInvalidProcessorType = errors.New("processor type was not recognised") ErrInvalidCacheType = errors.New("cache type was not recognised") ErrInvalidConditionType = errors.New("condition type was not recognised") ErrInvalidBufferType = errors.New("buffer type was not recognised") ErrInvalidInputType = errors.New("input type was not recognised") ErrInvalidOutputType = errors.New("output type was not recognised") ErrInvalidZMQType = errors.New("invalid ZMQ socket type") ErrInvalidScaleProtoType = errors.New("invalid Scalability Protocols socket type") // ErrAlreadyStarted is returned when an input or output type gets started a // second time. ErrAlreadyStarted = errors.New("type has already been started") ErrMessagePartNotExist = errors.New("target message part does not exist") ErrBadMessageBytes = errors.New("serialised message bytes were in unexpected format") ErrBlockCorrupted = errors.New("serialised messages block was in unexpected format") ErrNoAck = errors.New("failed to receive acknowledgement") )
Errors used throughout the codebase
var ( ErrCacheNotFound = errors.New("cache not found") ErrConditionNotFound = errors.New("condition not found") ErrKeyAlreadyExists = errors.New("key already exists") ErrKeyNotFound = errors.New("key does not exist") ErrPipeNotFound = errors.New("pipe was not found") )
Manager errors
var (
ErrMessageTooLarge = errors.New("message body larger than buffer space")
)
Buffer errors
Functions ¶
This section is empty.
Types ¶
type Cache ¶ added in v0.9.7
type Cache interface { // Get attempts to locate and return a cached value by its key, returns an // error if the key does not exist or if the command fails. Get(key string) ([]byte, error) // Set attempts to set the value of a key, returns an error if the command // fails. Set(key string, value []byte) error // Add attempts to set the value of a key only if the key does not already // exist, returns an error if the key already exists or if the command // fails. Add(key string, value []byte) error // Delete attempts to remove a key. Returns an error if a failure occurs. Delete(key string) error }
Cache is a key/value store that can be shared across components and executing threads of a Benthos service.
type Closable ¶
type Closable interface { // CloseAsync triggers a closure of this object but does not block until // completion. CloseAsync() // WaitForClose is a blocking call to wait until the object has finished // closing down and cleaning up resources. WaitForClose(timeout time.Duration) error }
Closable defines a type that can be safely closed down and cleaned up.
type Condition ¶ added in v0.9.7
type Condition interface { // Check tests a message against a configured condition. Check(msg Message) bool }
Condition reads a message, calculates a condition and returns a boolean.
type Consumer ¶
type Consumer interface { // Consume starts the type receiving transactions from a Transactor. Consume(<-chan Transaction) error }
Consumer is the higher level consumer type.
type DudMgr ¶ added in v0.11.4
type DudMgr struct {
ID int
}
DudMgr is a noop implementation of a types.Manager.
func (DudMgr) GetCondition ¶ added in v0.11.4
GetCondition always returns ErrConditionNotFound.
func (DudMgr) GetPipe ¶ added in v0.15.4
func (f DudMgr) GetPipe(name string) (<-chan Transaction, error)
GetPipe attempts to find a service wide message producer by its name.
func (DudMgr) RegisterEndpoint ¶ added in v0.11.4
func (f DudMgr) RegisterEndpoint(path, desc string, h http.HandlerFunc)
RegisterEndpoint is a noop.
func (DudMgr) SetPipe ¶ added in v0.15.4
func (f DudMgr) SetPipe(name string, t <-chan Transaction)
SetPipe registers a message producer under a name.
func (DudMgr) UnsetPipe ¶ added in v0.15.4
func (f DudMgr) UnsetPipe(name string, t <-chan Transaction)
UnsetPipe removes a named pipe.
type ErrUnexpectedHTTPRes ¶
ErrUnexpectedHTTPRes is an error returned when an HTTP request returned an unexpected response.
func (ErrUnexpectedHTTPRes) Error ¶
func (e ErrUnexpectedHTTPRes) Error() string
Error returns the Error string.
type HTTPMessage ¶
type HTTPMessage struct {
Parts []string `json:"parts"`
}
HTTPMessage is a struct containing a benthos message, to be sent over the wire in an HTTP request. Message parts are sent as JSON strings.
type HTTPResponse ¶
type HTTPResponse struct {
Error string `json:"error"`
}
HTTPResponse is a struct expected as an HTTP response after sending a message. The intention is to confirm that the message has been received successfully.
type Manager ¶ added in v0.8.0
type Manager interface { // RegisterEndpoint registers a server wide HTTP endpoint. RegisterEndpoint(path, desc string, h http.HandlerFunc) // GetCache attempts to find a service wide cache by its name. GetCache(name string) (Cache, error) // GetCondition attempts to find a service wide condition by its name. GetCondition(name string) (Condition, error) // GetPipe attempts to find a service wide transaction chan by its name. GetPipe(name string) (<-chan Transaction, error) // SetPipe registers a transaction chan under a name. SetPipe(name string, t <-chan Transaction) // UnsetPipe removes a named transaction chan. UnsetPipe(name string, t <-chan Transaction) }
Manager is an interface expected by Benthos components that allows them to register their service wide behaviours such as HTTP endpoints and event listeners, and obtain service wide shared resources such as caches.
type Message ¶
type Message interface { // Get attempts to access a message part from an index. If the index is // negative then the part is found by counting backwards from the last part // starting at -1. If the index is out of bounds then nil is returned. It is // not safe to edit the contents of a message directly. Get(p int) []byte // GetAll returns all message parts as a two-dimensional byte array. It is // NOT safe to edit the contents of the result. GetAll() [][]byte // Set edits the contents of an existing message part. If the index is // negative then the part is found by counting backwards from the last part // starting at -1. If the index is out of bounds then nothing is done. Set(p int, b []byte) // SetAll replaces all parts of a message with a new set. SetAll(p [][]byte) // GetMetadata returns a metadata value if the key exists. GetMetadata(key string) string // SetMetadata sets the value of a metadata key. SetMetadata(key, value string) // IterMetadata iterates the message metadata, calling func on each // key/value pair. IterMetadata(f func(k, v string) error) error // GetJSON returns a message part parsed as JSON into an `interface{}` type. // This is lazily evaluated and the result is cached. If multiple layers of // a pipeline extract the same part as JSON it will only be unmarshalled // once, unless the content of the part has changed. If the index is // negative then the part is found by counting backwards from the last part // starting at -1. GetJSON(p int) (interface{}, error) // SetJSON sets a message part to the marshalled bytes of a JSON object, but // also caches the object itself. If the JSON contents of a part is // subsequently queried it will receive the cached version as long as the // raw content has not changed. If the index is negative then the part is // found by counting backwards from the last part starting at -1. SetJSON(p int, jObj interface{}) error // Append appends new message parts to the message and returns the index of // last part to be added. Append(b ...[]byte) int // Len returns the number of parts this message contains. Len() int // Iter will iterate each message part in order, calling the closure // argument with the index and contents of the message part. Iter(f func(i int, b []byte) error) error // Bytes returns a binary representation of the message, which can be later // parsed back into a multipart message with `FromBytes`. The result of this // call can itself be the part of a new message, which is a useful way of // transporting multiple part messages across protocols that only support // single parts. Bytes() []byte // LazyCondition lazily evaluates conditions on the message by caching the // results as per a label to identify the condition. The cache of results is // cleared whenever the contents of the message is changed. LazyCondition(label string, cond Condition) bool // ShallowCopy creates a shallow copy of the message, where the list of // message parts can be edited independently from the original version. // However, editing the byte array contents of a message part will still // alter the contents of the original. ShallowCopy() Message // DeepCopy creates a deep copy of the message, where the message part // contents are entirely copied and are therefore safe to edit without // altering the original. DeepCopy() Message // CreatedAt returns the time at which the message was created. CreatedAt() time.Time }
Message is an interface representing a payload of data that was received from an input. Messages contain multiple parts, where each part is a byte array. If an input supports only single part messages they will still be read as multipart messages with one part.
func LockMessage ¶ added in v0.15.0
LockMessage wraps a message into a read only type that restricts access to only a single message part, accessible either by index 0 or -1.
type Producer ¶
type Producer interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan Transaction }
Producer is a type that sends messages as transactions and waits for a response back, the response indicates whether the message was successfully propagated to a new destination (and can be discarded from the source.)
type Response ¶
type Response interface { // Error returns a non-nil error if the message failed to reach its // destination. Error() error // SkipAck indicates that even though there may not have been an error in // processing a message, it should not be acknowledged. If SkipAck is false // and Error is nil then all unacknowledged messages should be acknowledged // also. SkipAck() bool }
Response is a response from an output, agent or broker that confirms the input of successful message receipt.
type SimpleResponse ¶
type SimpleResponse struct {
// contains filtered or unexported fields
}
SimpleResponse is returned by an output or agent to provide a single return message.
func NewSimpleResponse ¶
func NewSimpleResponse(err error) SimpleResponse
NewSimpleResponse returns a response with an error (nil error signals successful receipt).
func (SimpleResponse) Error ¶
func (o SimpleResponse) Error() error
Error returns the underlying error.
func (SimpleResponse) SkipAck ¶ added in v0.3.1
func (o SimpleResponse) SkipAck() bool
SkipAck indicates whether a successful message should be acknowledged.
type Transaction ¶ added in v0.9.0
type Transaction struct { // Payload is the message payload of this transaction. Payload Message // ResponseChan should receive a response at the end of a transaction (once // the message is no longer owned by the receiver.) The response itself // indicates whether the message has been propagated successfully. ResponseChan chan<- Response }
Transaction is a type respesenting a transaction containing a payload (the message) and a response channel, which is used to indicate whether the message was successfully propagated to the next destination.
func NewTransaction ¶ added in v0.9.0
func NewTransaction(payload Message, resChan chan<- Response) Transaction
NewTransaction creates a new transaction object from a message payload and a response channel.
type UnacknowledgedResponse ¶ added in v0.3.1
type UnacknowledgedResponse struct{}
UnacknowledgedResponse is a response type that indicates the message has reached its destination but should not be acknowledged.
func NewUnacknowledgedResponse ¶ added in v0.3.1
func NewUnacknowledgedResponse() UnacknowledgedResponse
NewUnacknowledgedResponse returns an UnacknowledgedResponse.
func (UnacknowledgedResponse) Error ¶ added in v0.3.1
func (u UnacknowledgedResponse) Error() error
Error returns the underlying error.
func (UnacknowledgedResponse) SkipAck ¶ added in v0.3.1
func (u UnacknowledgedResponse) SkipAck() bool
SkipAck indicates whether a successful message should be acknowledged.