Documentation ¶
Overview ¶
Package types defines any general structs and interfaces used throughout the benthos code base.
Benthos uses abstract types to represent arbitrary producers and consumers of data to its core components. This allows us to construct types for piping data in various arrangements without regard for the specific destinations and sources of our data.
The basic principle behind a producer/consumer relationship is that a producer pipes data to the consumer in lock-step, where for each message sent it will expect a response that confirms the message was received and propagated onwards.
Messages and responses are sent via channels, and in order to instigate this pairing each type is expected to create and maintain ownership of its respective sending channel.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrTimeout = errors.New("action timed out") ErrChanClosed = errors.New("channel was closed unexpectedly") ErrTypeClosed = errors.New("type was closed") ErrNotConnected = errors.New("not connected to target source or sink") ErrInvalidProcessorType = errors.New("processor type was not recognised") ErrInvalidBufferType = errors.New("buffer type was not recognised") ErrInvalidInputType = errors.New("input type was not recognised") ErrInvalidOutputType = errors.New("output type was not recognised") ErrInvalidZMQType = errors.New("invalid ZMQ socket type") ErrInvalidScaleProtoType = errors.New("invalid Scalability Protocols socket type") // ErrAlreadyStarted is returned when an input or output type gets started a // second time. ErrAlreadyStarted = errors.New("type has already been started") ErrMessagePartNotExist = errors.New("target message part does not exist") ErrBadMessageBytes = errors.New("serialised message bytes were in unexpected format") ErrBlockCorrupted = errors.New("serialised messages block was in unexpected format") ErrNoAck = errors.New("failed to receive acknowledgement") )
Errors used throughout the codebase
var (
ErrMessageTooLarge = errors.New("message body larger than buffer space")
)
Buffer errors
Functions ¶
This section is empty.
Types ¶
type Closable ¶
type Closable interface { // CloseAsync triggers a closure of this object but does not block until // completion. CloseAsync() // WaitForClose is a blocking call to wait until the object has finished // closing down and cleaning up resources. WaitForClose(timeout time.Duration) error }
Closable defines a type that can be safely closed down and cleaned up.
type Consumer ¶
type Consumer interface { MessageReceiver Responder }
Consumer is the higher level consumer type.
type ErrUnexpectedHTTPRes ¶
ErrUnexpectedHTTPRes is an error returned when an HTTP request returned an unexpected response.
func (ErrUnexpectedHTTPRes) Error ¶
func (e ErrUnexpectedHTTPRes) Error() string
Error returns the Error string.
type HTTPMessage ¶
type HTTPMessage struct {
Parts []string `json:"parts"`
}
HTTPMessage is a struct containing a benthos message, to be sent over the wire in an HTTP request. Message parts are sent as JSON strings.
type HTTPResponse ¶
type HTTPResponse struct {
Error string `json:"error"`
}
HTTPResponse is a struct expected as an HTTP response after sending a message. The intention is to confirm that the message has been received successfully.
type Message ¶
type Message struct { Parts [][]byte `json:"parts"` // contains filtered or unexported fields }
Message is a struct containing any relevant fields of a benthos message and helper functions.
func (*Message) GetJSON ¶ added in v0.6.19
GetJSON returns a message part parsed into an `interface{}` type. This is lazily evaluated and the result is cached. If multiple layers of a pipeline extract the same part as JSON it will only be unmarshalled once, unless the content of the part has changed.
func (*Message) SetJSON ¶ added in v0.6.19
SetJSON sets a message part to the marshalled bytes of a JSON object, but also caches the object itself. If the JSON contents of a part is subsequently queried it will receive the cached version as long as the raw content has not changed.
func (*Message) ShallowCopy ¶ added in v0.7.2
ShallowCopy creates a new shallow copy of the message. Parts can be re-arranged in the new copy and JSON parts can be get/set without impacting other message copies. However, it is still unsafe to edit the content of parts.
type MessageReceiver ¶
type MessageReceiver interface { // StartReceiving starts the type receiving messages from a channel. StartReceiving(<-chan Message) error }
MessageReceiver is a type that receives messages from an input.
type MessageSender ¶
type MessageSender interface { // MessageChan returns the channel used for consuming messages from this // input. MessageChan() <-chan Message }
MessageSender is a type that sends messages to an output.
type Producer ¶
type Producer interface { MessageSender ResponderListener }
Producer is the higher level producer type.
type Responder ¶
type Responder interface { // ResponseChan returns a response for every input message received. ResponseChan() <-chan Response }
Responder defines a type that will send a response every time a message is received.
type ResponderListener ¶
type ResponderListener interface { // StartListening starts the type listening to a channel. StartListening(<-chan Response) error }
ResponderListener is a type that listens to a Responder type.
type Response ¶
type Response interface { // Error returns a non-nil error if the message failed to reach its // destination. Error() error // SkipAck indicates that even though there may not have been an error in // processing a message, it should not be acknowledged. If SkipAck is false // and Error is nil then all unacknowledged messages should be acknowledged // also. SkipAck() bool }
Response is a response from an output, agent or broker that confirms the input of successful message receipt.
type SimpleResponse ¶
type SimpleResponse struct {
// contains filtered or unexported fields
}
SimpleResponse is returned by an output or agent to provide a single return message.
func NewSimpleResponse ¶
func NewSimpleResponse(err error) SimpleResponse
NewSimpleResponse returns a response with an error (nil error signals successful receipt).
func (SimpleResponse) Error ¶
func (o SimpleResponse) Error() error
Error returns the underlying error.
func (SimpleResponse) SkipAck ¶ added in v0.3.1
func (o SimpleResponse) SkipAck() bool
SkipAck indicates whether a successful message should be acknowledged.
type UnacknowledgedResponse ¶ added in v0.3.1
type UnacknowledgedResponse struct{}
UnacknowledgedResponse is a response type that indicates the message has reached its destination but should not be acknowledged.
func NewUnacknowledgedResponse ¶ added in v0.3.1
func NewUnacknowledgedResponse() UnacknowledgedResponse
NewUnacknowledgedResponse returns an UnacknowledgedResponse.
func (UnacknowledgedResponse) Error ¶ added in v0.3.1
func (u UnacknowledgedResponse) Error() error
Error returns the underlying error.
func (UnacknowledgedResponse) SkipAck ¶ added in v0.3.1
func (u UnacknowledgedResponse) SkipAck() bool
SkipAck indicates whether a successful message should be acknowledged.