types

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2018 License: MIT Imports: 7 Imported by: 45

Documentation

Overview

Package types defines any general structs and interfaces used throughout the benthos code base.

Benthos uses abstract types to represent arbitrary producers and consumers of data to its core components. This allows us to construct types for piping data in various arrangements without regard for the specific destinations and sources of our data.

The basic principle behind a producer/consumer relationship is that a producer pipes data to the consumer in lock-step, where for each message sent it will expect a response that confirms the message was received and propagated onwards.

Messages and responses are sent via channels, and in order to instigate this pairing each type is expected to create and maintain ownership of its respective sending channel.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout    = errors.New("action timed out")
	ErrChanClosed = errors.New("channel was closed unexpectedly")
	ErrTypeClosed = errors.New("type was closed")

	ErrNotConnected = errors.New("not connected to target source or sink")

	ErrInvalidProcessorType = errors.New("processor type was not recognised")
	ErrInvalidConditionType = errors.New("condition type was not recognised")
	ErrInvalidBufferType    = errors.New("buffer type was not recognised")
	ErrInvalidInputType     = errors.New("input type was not recognised")
	ErrInvalidOutputType    = errors.New("output type was not recognised")

	ErrInvalidZMQType        = errors.New("invalid ZMQ socket type")
	ErrInvalidScaleProtoType = errors.New("invalid Scalability Protocols socket type")

	// ErrAlreadyStarted is returned when an input or output type gets started a
	// second time.
	ErrAlreadyStarted = errors.New("type has already been started")

	ErrMessagePartNotExist = errors.New("target message part does not exist")
	ErrBadMessageBytes     = errors.New("serialised message bytes were in unexpected format")
	ErrBlockCorrupted      = errors.New("serialised messages block was in unexpected format")

	ErrNoAck = errors.New("failed to receive acknowledgement")
)

Errors used throughout the codebase

View Source
var (
	ErrMessageTooLarge = errors.New("message body larger than buffer space")
)

Buffer errors

Functions

This section is empty.

Types

type Closable

type Closable interface {
	// CloseAsync triggers a closure of this object but does not block until
	// completion.
	CloseAsync()

	// WaitForClose is a blocking call to wait until the object has finished
	// closing down and cleaning up resources.
	WaitForClose(timeout time.Duration) error
}

Closable defines a type that can be safely closed down and cleaned up.

type Consumer

type Consumer interface {
	TransactionReceiver
}

Consumer is the higher level consumer type.

type ErrUnexpectedHTTPRes

type ErrUnexpectedHTTPRes struct {
	Code int
	S    string
}

ErrUnexpectedHTTPRes is an error returned when an HTTP request returned an unexpected response.

func (ErrUnexpectedHTTPRes) Error

func (e ErrUnexpectedHTTPRes) Error() string

Error returns the Error string.

type HTTPMessage

type HTTPMessage struct {
	Parts []string `json:"parts"`
}

HTTPMessage is a struct containing a benthos message, to be sent over the wire in an HTTP request. Message parts are sent as JSON strings.

type HTTPResponse

type HTTPResponse struct {
	Error string `json:"error"`
}

HTTPResponse is a struct expected as an HTTP response after sending a message. The intention is to confirm that the message has been received successfully.

type Input added in v0.9.0

type Input interface {
	Producer
	Closable
}

Input is a closable producer.

type Manager added in v0.8.0

type Manager interface {
	// RegisterEndpoint registers a server wide HTTP endpoint.
	RegisterEndpoint(path, desc string, h http.HandlerFunc)
}

Manager is an interface expected by Benthos components that allows them to register their service wide behaviours such as HTTP endpoints and event listeners.

type Message

type Message struct {
	Parts [][]byte `json:"parts"`
	// contains filtered or unexported fields
}

Message is a struct containing any relevant fields of a benthos message and helper functions.

func FromBytes

func FromBytes(b []byte) (Message, error)

FromBytes deserialises a Message from a byte array.

func NewMessage

func NewMessage() Message

NewMessage initializes an empty message.

func (*Message) Bytes

func (m *Message) Bytes() []byte

Bytes serialises the message into a single byte array.

func (*Message) DeepCopy added in v0.9.1

func (m *Message) DeepCopy() Message

DeepCopy creates a new deep copy of the message. This can be considered an entirely new object that is safe to use anywhere.

func (*Message) GetJSON added in v0.6.19

func (m *Message) GetJSON(part int) (interface{}, error)

GetJSON returns a message part parsed into an `interface{}` type. This is lazily evaluated and the result is cached. If multiple layers of a pipeline extract the same part as JSON it will only be unmarshalled once, unless the content of the part has changed.

func (*Message) SetJSON added in v0.6.19

func (m *Message) SetJSON(part int, jObj interface{}) error

SetJSON sets a message part to the marshalled bytes of a JSON object, but also caches the object itself. If the JSON contents of a part is subsequently queried it will receive the cached version as long as the raw content has not changed.

func (*Message) ShallowCopy added in v0.7.2

func (m *Message) ShallowCopy() Message

ShallowCopy creates a new shallow copy of the message. Parts can be re-arranged in the new copy and JSON parts can be get/set without impacting other message copies. However, it is still unsafe to edit the content of parts.

type Output added in v0.9.0

type Output interface {
	Consumer
	Closable
}

Output is a closable consumer.

type Producer

type Producer interface {
	Transactor
}

Producer is the higher level producer type.

type Response

type Response interface {
	// Error returns a non-nil error if the message failed to reach its
	// destination.
	Error() error

	// SkipAck indicates that even though there may not have been an error in
	// processing a message, it should not be acknowledged. If SkipAck is false
	// and Error is nil then all unacknowledged messages should be acknowledged
	// also.
	SkipAck() bool
}

Response is a response from an output, agent or broker that confirms the input of successful message receipt.

type SimpleResponse

type SimpleResponse struct {
	// contains filtered or unexported fields
}

SimpleResponse is returned by an output or agent to provide a single return message.

func NewSimpleResponse

func NewSimpleResponse(err error) SimpleResponse

NewSimpleResponse returns a response with an error (nil error signals successful receipt).

func (SimpleResponse) Error

func (o SimpleResponse) Error() error

Error returns the underlying error.

func (SimpleResponse) SkipAck added in v0.3.1

func (o SimpleResponse) SkipAck() bool

SkipAck indicates whether a successful message should be acknowledged.

type Transaction added in v0.9.0

type Transaction struct {
	// Payload is the message payload of this transaction.
	Payload Message

	// ResponseChan should receive a response at the end of a transaction (once
	// the message is no longer owned by the receiver.) The response itself
	// indicates whether the message has been propagated successfully.
	ResponseChan chan<- Response
}

Transaction is a type respesenting a transaction containing a payload (the message) and a response channel, which is used to indicate whether the message was successfully propagated to the next destination.

func NewTransaction added in v0.9.0

func NewTransaction(payload Message, resChan chan<- Response) Transaction

NewTransaction creates a new transaction object from a message payload and a response channel.

type TransactionReceiver added in v0.9.0

type TransactionReceiver interface {
	// StartReceiving starts the type receiving transactions from a Transactor.
	StartReceiving(<-chan Transaction) error
}

TransactionReceiver is a type that receives transactions from a Transactor.

type Transactor added in v0.9.0

type Transactor interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan Transaction
}

Transactor is a type that sends messages and waits for a response back, the response indicates whether the message was successfully propagated to a new destination (and can be discarded from the source.)

type UnacknowledgedResponse added in v0.3.1

type UnacknowledgedResponse struct{}

UnacknowledgedResponse is a response type that indicates the message has reached its destination but should not be acknowledged.

func NewUnacknowledgedResponse added in v0.3.1

func NewUnacknowledgedResponse() UnacknowledgedResponse

NewUnacknowledgedResponse returns an UnacknowledgedResponse.

func (UnacknowledgedResponse) Error added in v0.3.1

func (u UnacknowledgedResponse) Error() error

Error returns the underlying error.

func (UnacknowledgedResponse) SkipAck added in v0.3.1

func (u UnacknowledgedResponse) SkipAck() bool

SkipAck indicates whether a successful message should be acknowledged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL