types

package
v0.7.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2018 License: MIT Imports: 6 Imported by: 45

Documentation

Overview

Package types defines any general structs and interfaces used throughout the benthos code base.

Benthos uses abstract types to represent arbitrary producers and consumers of data to its core components. This allows us to construct types for piping data in various arrangements without regard for the specific destinations and sources of our data.

The basic principle behind a producer/consumer relationship is that a producer pipes data to the consumer in lock-step, where for each message sent it will expect a response that confirms the message was received and propagated onwards.

Messages and responses are sent via channels, and in order to instigate this pairing each type is expected to create and maintain ownership of its respective sending channel.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout    = errors.New("action timed out")
	ErrChanClosed = errors.New("channel was closed unexpectedly")
	ErrTypeClosed = errors.New("type was closed")

	ErrNotConnected = errors.New("not connected to target source or sink")

	ErrInvalidProcessorType = errors.New("processor type was not recognised")
	ErrInvalidBufferType    = errors.New("buffer type was not recognised")
	ErrInvalidInputType     = errors.New("input type was not recognised")
	ErrInvalidOutputType    = errors.New("output type was not recognised")

	ErrInvalidZMQType        = errors.New("invalid ZMQ socket type")
	ErrInvalidScaleProtoType = errors.New("invalid Scalability Protocols socket type")

	// ErrAlreadyStarted is returned when an input or output type gets started a
	// second time.
	ErrAlreadyStarted = errors.New("type has already been started")

	ErrMessagePartNotExist = errors.New("target message part does not exist")
	ErrBadMessageBytes     = errors.New("serialised message bytes were in unexpected format")
	ErrBlockCorrupted      = errors.New("serialised messages block was in unexpected format")

	ErrNoAck = errors.New("failed to receive acknowledgement")
)

Errors used throughout the codebase

View Source
var (
	ErrMessageTooLarge = errors.New("message body larger than buffer space")
)

Buffer errors

Functions

This section is empty.

Types

type Closable

type Closable interface {
	// CloseAsync triggers a closure of this object but does not block until
	// completion.
	CloseAsync()

	// WaitForClose is a blocking call to wait until the object has finished
	// closing down and cleaning up resources.
	WaitForClose(timeout time.Duration) error
}

Closable defines a type that can be safely closed down and cleaned up.

type Consumer

type Consumer interface {
	MessageReceiver
	Responder
}

Consumer is the higher level consumer type.

type ErrUnexpectedHTTPRes

type ErrUnexpectedHTTPRes struct {
	Code int
	S    string
}

ErrUnexpectedHTTPRes is an error returned when an HTTP request returned an unexpected response.

func (ErrUnexpectedHTTPRes) Error

func (e ErrUnexpectedHTTPRes) Error() string

Error returns the Error string.

type HTTPMessage

type HTTPMessage struct {
	Parts []string `json:"parts"`
}

HTTPMessage is a struct containing a benthos message, to be sent over the wire in an HTTP request. Message parts are sent as JSON strings.

type HTTPResponse

type HTTPResponse struct {
	Error string `json:"error"`
}

HTTPResponse is a struct expected as an HTTP response after sending a message. The intention is to confirm that the message has been received successfully.

type Message

type Message struct {
	Parts [][]byte `json:"parts"`
	// contains filtered or unexported fields
}

Message is a struct containing any relevant fields of a benthos message and helper functions.

func FromBytes

func FromBytes(b []byte) (Message, error)

FromBytes deserialises a Message from a byte array.

func NewMessage

func NewMessage() Message

NewMessage initializes an empty message.

func (*Message) Bytes

func (m *Message) Bytes() []byte

Bytes serialises the message into a single byte array.

func (*Message) GetJSON added in v0.6.19

func (m *Message) GetJSON(part int) (interface{}, error)

GetJSON returns a message part parsed into an `interface{}` type. This is lazily evaluated and the result is cached. If multiple layers of a pipeline extract the same part as JSON it will only be unmarshalled once, unless the content of the part has changed.

func (*Message) SetJSON added in v0.6.19

func (m *Message) SetJSON(part int, jObj interface{}) error

SetJSON sets a message part to the marshalled bytes of a JSON object, but also caches the object itself. If the JSON contents of a part is subsequently queried it will receive the cached version as long as the raw content has not changed.

func (*Message) ShallowCopy added in v0.7.2

func (m *Message) ShallowCopy() Message

ShallowCopy creates a new shallow copy of the message. Parts can be re-arranged in the new copy and JSON parts can be get/set without impacting other message copies. However, it is still unsafe to edit the content of parts.

type MessageReceiver

type MessageReceiver interface {
	// StartReceiving starts the type receiving messages from a channel.
	StartReceiving(<-chan Message) error
}

MessageReceiver is a type that receives messages from an input.

type MessageSender

type MessageSender interface {
	// MessageChan returns the channel used for consuming messages from this
	// input.
	MessageChan() <-chan Message
}

MessageSender is a type that sends messages to an output.

type Producer

type Producer interface {
	MessageSender
	ResponderListener
}

Producer is the higher level producer type.

type Responder

type Responder interface {
	// ResponseChan returns a response for every input message received.
	ResponseChan() <-chan Response
}

Responder defines a type that will send a response every time a message is received.

type ResponderListener

type ResponderListener interface {
	// StartListening starts the type listening to a channel.
	StartListening(<-chan Response) error
}

ResponderListener is a type that listens to a Responder type.

type Response

type Response interface {
	// Error returns a non-nil error if the message failed to reach its
	// destination.
	Error() error

	// SkipAck indicates that even though there may not have been an error in
	// processing a message, it should not be acknowledged. If SkipAck is false
	// and Error is nil then all unacknowledged messages should be acknowledged
	// also.
	SkipAck() bool
}

Response is a response from an output, agent or broker that confirms the input of successful message receipt.

type SimpleResponse

type SimpleResponse struct {
	// contains filtered or unexported fields
}

SimpleResponse is returned by an output or agent to provide a single return message.

func NewSimpleResponse

func NewSimpleResponse(err error) SimpleResponse

NewSimpleResponse returns a response with an error (nil error signals successful receipt).

func (SimpleResponse) Error

func (o SimpleResponse) Error() error

Error returns the underlying error.

func (SimpleResponse) SkipAck added in v0.3.1

func (o SimpleResponse) SkipAck() bool

SkipAck indicates whether a successful message should be acknowledged.

type UnacknowledgedResponse added in v0.3.1

type UnacknowledgedResponse struct{}

UnacknowledgedResponse is a response type that indicates the message has reached its destination but should not be acknowledged.

func NewUnacknowledgedResponse added in v0.3.1

func NewUnacknowledgedResponse() UnacknowledgedResponse

NewUnacknowledgedResponse returns an UnacknowledgedResponse.

func (UnacknowledgedResponse) Error added in v0.3.1

func (u UnacknowledgedResponse) Error() error

Error returns the underlying error.

func (UnacknowledgedResponse) SkipAck added in v0.3.1

func (u UnacknowledgedResponse) SkipAck() bool

SkipAck indicates whether a successful message should be acknowledged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL