Documentation ¶
Overview ¶
Package types - Defines any general structs and interfaces used throughout the benthos code base.
Benthos uses abstract types to represent arbitrary producers and consumers of data to its core components. This allows us to construct types for piping data in various arrangements without regard for the specific destinations and sources of our data.
The basic principle behind a producer/consumer relationship is that a producer pipes data to the consumer in lock-step, where for each message sent it will expect a response that confirms the message was received and propagated onwards.
Messages and responses are sent via channels, and in order to instigate this pairing each type is expected to create and maintain ownership of its respective sending channel.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrTimeout = errors.New("action timed out") ErrChanClosed = errors.New("channel was closed unexpectedly") ErrTypeClosed = errors.New("type was closed") ErrInvalidBufferType = errors.New("buffer type was not recognised") ErrInvalidInputType = errors.New("input type was not recognised") ErrInvalidOutputType = errors.New("output type was not recognised") ErrInvalidZMQType = errors.New("invalid ZMQ socket type") ErrInvalidScaleProtoType = errors.New("invalid Scalability Protocols socket type") // ErrAlreadyStarted - When an input or output type gets started a second time. ErrAlreadyStarted = errors.New("type has already been started") ErrBadMessageBytes = errors.New("serialised message bytes were in unexpected format") ErrBlockCorrupted = errors.New("serialised messages block was in unexpected format") )
Errors used throughout the codebase
var (
ErrMessageTooLarge = errors.New("message body larger than buffer space")
)
Buffer errors
Functions ¶
This section is empty.
Types ¶
type Closable ¶
type Closable interface { // CloseAsync - Trigger a closure of this object but do not block until completion. CloseAsync() // WaitForClose - A blocking call to wait until the object has finished closing down and // cleaning up resources. WaitForClose(timeout time.Duration) error }
Closable - Defines a type that can be safely closed down and cleaned up.
type Consumer ¶
type Consumer interface { MessageReceiver Responder }
Consumer - The higher level consumer type.
type ErrUnexpectedHTTPRes ¶
ErrUnexpectedHTTPRes - Error returned when an HTTP request returned an unexpected response.
func (ErrUnexpectedHTTPRes) Error ¶
func (e ErrUnexpectedHTTPRes) Error() string
Error - Returns the Error string.
type HTTPMessage ¶
type HTTPMessage struct {
Parts []string `json:"parts"`
}
HTTPMessage - A struct containing a benthos message, to be sent over the wire in an HTTP request. Message parts are sent as JSON strings.
type HTTPResponse ¶
type HTTPResponse struct {
Error string `json:"error"`
}
HTTPResponse - A struct expected as an HTTP response after sending a message. The intention is to confirm that the message has been received successfully.
type MappedResponse ¶
MappedResponse - Returned by a broker to provide a map of errors representing agent errors.
func NewMappedResponse ¶
func NewMappedResponse() *MappedResponse
NewMappedResponse - Returns a response tailored for a broker (with n agents).
func (*MappedResponse) Error ¶
func (b *MappedResponse) Error() error
Error - Returns nil if no errors are present, otherwise a concatenated blob of errors.
func (*MappedResponse) ErrorMap ¶
func (b *MappedResponse) ErrorMap() map[int]error
ErrorMap - Returns a map of errors returned by agents, represented by index.
type Message ¶
type Message struct {
Parts [][]byte `json:"parts"`
}
Message - A struct containing any relevant fields of a benthos message and helper functions.
type MessageReceiver ¶
type MessageReceiver interface { // StartReceiving - Starts the type receiving messages from a channel. StartReceiving(<-chan Message) error }
MessageReceiver - A type that receives messages from an input.
type MessageSender ¶
type MessageSender interface { // MessageChan - Returns the channel used for consuming messages from this input. MessageChan() <-chan Message }
MessageSender - A type that sends messages to an output.
type Producer ¶
type Producer interface { MessageSender ResponderListener }
Producer - The higher level producer type.
type Responder ¶
type Responder interface { // ResponseChan - Returns a response for every input message received. ResponseChan() <-chan Response }
Responder - Defines a type that will send a response every time a message is received.
type ResponderListener ¶
type ResponderListener interface { // StartListening - Starts the type listening to a channel. StartListening(<-chan Response) error }
ResponderListener - A type that listens to a Responder type.
type Response ¶
Response - A response from an output, agent or broker that confirms the input of successful message receipt.
type SimpleResponse ¶
type SimpleResponse struct {
// contains filtered or unexported fields
}
SimpleResponse - Returned by an output or agent to provide a single return message.
func NewSimpleResponse ¶
func NewSimpleResponse(err error) *SimpleResponse
NewSimpleResponse - Returns a response with an error (nil error signals successful receipt).
func (*SimpleResponse) Error ¶
func (o *SimpleResponse) Error() error
Error - Returns the underlying error.
func (*SimpleResponse) ErrorMap ¶
func (o *SimpleResponse) ErrorMap() map[int]error
ErrorMap - Returns nil.