types

package
v0.16.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2018 License: MIT Imports: 5 Imported by: 45

Documentation

Overview

Package types defines any general structs and interfaces used throughout the benthos code base.

Benthos uses abstract types to represent arbitrary producers and consumers of data to its core components. This allows us to construct types for piping data in various arrangements without regard for the specific destinations and sources of our data.

The basic principle behind a producer/consumer relationship is that a producer pipes data to the consumer in lock-step, where for each message sent it will expect a response that confirms the message was received and propagated onwards.

Messages and responses are sent via channels, and in order to instigate this pairing each type is expected to create and maintain ownership of its respective sending channel.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout    = errors.New("action timed out")
	ErrChanClosed = errors.New("channel was closed unexpectedly")
	ErrTypeClosed = errors.New("type was closed")

	ErrNotConnected = errors.New("not connected to target source or sink")

	ErrInvalidProcessorType = errors.New("processor type was not recognised")
	ErrInvalidCacheType     = errors.New("cache type was not recognised")
	ErrInvalidConditionType = errors.New("condition type was not recognised")
	ErrInvalidBufferType    = errors.New("buffer type was not recognised")
	ErrInvalidInputType     = errors.New("input type was not recognised")
	ErrInvalidOutputType    = errors.New("output type was not recognised")

	ErrInvalidZMQType        = errors.New("invalid ZMQ socket type")
	ErrInvalidScaleProtoType = errors.New("invalid Scalability Protocols socket type")

	// ErrAlreadyStarted is returned when an input or output type gets started a
	// second time.
	ErrAlreadyStarted = errors.New("type has already been started")

	ErrMessagePartNotExist = errors.New("target message part does not exist")
	ErrBadMessageBytes     = errors.New("serialised message bytes were in unexpected format")
	ErrBlockCorrupted      = errors.New("serialised messages block was in unexpected format")

	ErrNoAck = errors.New("failed to receive acknowledgement")
)

Errors used throughout the codebase

View Source
var (
	ErrCacheNotFound     = errors.New("cache not found")
	ErrConditionNotFound = errors.New("condition not found")
	ErrKeyAlreadyExists  = errors.New("key already exists")
	ErrKeyNotFound       = errors.New("key does not exist")
	ErrPipeNotFound      = errors.New("pipe was not found")
)

Manager errors

View Source
var (
	ErrMessageTooLarge = errors.New("message body larger than buffer space")
)

Buffer errors

Functions

This section is empty.

Types

type Cache added in v0.9.7

type Cache interface {
	// Get attempts to locate and return a cached value by its key, returns an
	// error if the key does not exist or if the command fails.
	Get(key string) ([]byte, error)

	// Set attempts to set the value of a key, returns an error if the command
	// fails.
	Set(key string, value []byte) error

	// Add attempts to set the value of a key only if the key does not already
	// exist, returns an error if the key already exists or if the command
	// fails.
	Add(key string, value []byte) error

	// Delete attempts to remove a key. Returns an error if a failure occurs.
	Delete(key string) error
}

Cache is a key/value store that can be shared across components and executing threads of a Benthos service.

type Closable

type Closable interface {
	// CloseAsync triggers a closure of this object but does not block until
	// completion.
	CloseAsync()

	// WaitForClose is a blocking call to wait until the object has finished
	// closing down and cleaning up resources.
	WaitForClose(timeout time.Duration) error
}

Closable defines a type that can be safely closed down and cleaned up.

type Condition added in v0.9.7

type Condition interface {
	// Check tests a message against a configured condition.
	Check(msg Message) bool
}

Condition reads a message, calculates a condition and returns a boolean.

type Consumer

type Consumer interface {
	TransactionReceiver
}

Consumer is the higher level consumer type.

type DudMgr added in v0.11.4

type DudMgr struct {
	ID int
}

DudMgr is a noop implementation of a types.Manager.

func (DudMgr) GetCache added in v0.11.4

func (f DudMgr) GetCache(name string) (Cache, error)

GetCache always returns ErrCacheNotFound.

func (DudMgr) GetCondition added in v0.11.4

func (f DudMgr) GetCondition(name string) (Condition, error)

GetCondition always returns ErrConditionNotFound.

func (DudMgr) GetPipe added in v0.15.4

func (f DudMgr) GetPipe(name string) (<-chan Transaction, error)

GetPipe attempts to find a service wide message producer by its name.

func (DudMgr) RegisterEndpoint added in v0.11.4

func (f DudMgr) RegisterEndpoint(path, desc string, h http.HandlerFunc)

RegisterEndpoint is a noop.

func (DudMgr) SetPipe added in v0.15.4

func (f DudMgr) SetPipe(name string, t <-chan Transaction)

SetPipe registers a message producer under a name.

func (DudMgr) UnsetPipe added in v0.15.4

func (f DudMgr) UnsetPipe(name string, t <-chan Transaction)

UnsetPipe removes a named pipe.

type ErrUnexpectedHTTPRes

type ErrUnexpectedHTTPRes struct {
	Code int
	S    string
}

ErrUnexpectedHTTPRes is an error returned when an HTTP request returned an unexpected response.

func (ErrUnexpectedHTTPRes) Error

func (e ErrUnexpectedHTTPRes) Error() string

Error returns the Error string.

type HTTPMessage

type HTTPMessage struct {
	Parts []string `json:"parts"`
}

HTTPMessage is a struct containing a benthos message, to be sent over the wire in an HTTP request. Message parts are sent as JSON strings.

type HTTPResponse

type HTTPResponse struct {
	Error string `json:"error"`
}

HTTPResponse is a struct expected as an HTTP response after sending a message. The intention is to confirm that the message has been received successfully.

type Input added in v0.9.0

type Input interface {
	Producer
	Closable
}

Input is a closable producer.

type Manager added in v0.8.0

type Manager interface {
	// RegisterEndpoint registers a server wide HTTP endpoint.
	RegisterEndpoint(path, desc string, h http.HandlerFunc)

	// GetCache attempts to find a service wide cache by its name.
	GetCache(name string) (Cache, error)

	// GetCondition attempts to find a service wide condition by its name.
	GetCondition(name string) (Condition, error)

	// GetPipe attempts to find a service wide transaction chan by its name.
	GetPipe(name string) (<-chan Transaction, error)

	// SetPipe registers a transaction chan under a name.
	SetPipe(name string, t <-chan Transaction)

	// UnsetPipe removes a named transaction chan.
	UnsetPipe(name string, t <-chan Transaction)
}

Manager is an interface expected by Benthos components that allows them to register their service wide behaviours such as HTTP endpoints and event listeners, and obtain service wide shared resources such as caches.

func NoopMgr added in v0.15.4

func NoopMgr() Manager

NoopMgr returns a Manager implementation that does nothing.

type Message

type Message interface {
	// Get attempts to access a message part from an index. If the index is
	// negative then the part is found by counting backwards from the last part
	// starting at -1. If the index is out of bounds then nil is returned. It is
	// not safe to edit the contents of a message directly.
	Get(p int) []byte

	// GetAll returns all message parts as a two-dimensional byte array. It is
	// NOT safe to edit the contents of the result.
	GetAll() [][]byte

	// Set edits the contents of an existing message part. If the index is
	// negative then the part is found by counting backwards from the last part
	// starting at -1. If the index is out of bounds then nothing is done.
	Set(p int, b []byte)

	// SetAll replaces all parts of a message with a new set.
	SetAll(p [][]byte)

	// GetJSON returns a message part parsed as JSON into an `interface{}` type.
	// This is lazily evaluated and the result is cached. If multiple layers of
	// a pipeline extract the same part as JSON it will only be unmarshalled
	// once, unless the content of the part has changed. If the index is
	// negative then the part is found by counting backwards from the last part
	// starting at -1.
	GetJSON(p int) (interface{}, error)

	// SetJSON sets a message part to the marshalled bytes of a JSON object, but
	// also caches the object itself. If the JSON contents of a part is
	// subsequently queried it will receive the cached version as long as the
	// raw content has not changed. If the index is negative then the part is
	// found by counting backwards from the last part starting at -1.
	SetJSON(p int, jObj interface{}) error

	// Append appends new message parts to the message and returns the index of
	// last part to be added.
	Append(b ...[]byte) int

	// Len returns the number of parts this message contains.
	Len() int

	// Iter will iterate each message part in order, calling the closure
	// argument with the index and contents of the message part.
	Iter(f func(i int, b []byte) error) error

	// Bytes returns a binary representation of the message, which can be later
	// parsed back into a multipart message with `FromBytes`. The result of this
	// call can itself be the part of a new message, which is a useful way of
	// transporting multiple part messages across protocols that only support
	// single parts.
	Bytes() []byte

	// LazyCondition lazily evaluates conditions on the message by caching the
	// results as per a label to identify the condition. The cache of results is
	// cleared whenever the contents of the message is changed.
	LazyCondition(label string, cond Condition) bool

	// ShallowCopy creates a shallow copy of the message, where the list of
	// message parts can be edited independently from the original version.
	// However, editing the byte array contents of a message part will still
	// alter the contents of the original.
	ShallowCopy() Message

	// DeepCopy creates a deep copy of the message, where the message part
	// contents are entirely copied and are therefore safe to edit without
	// altering the original.
	DeepCopy() Message

	// CreatedAt returns the time at which the message was created.
	CreatedAt() time.Time
}

Message is an interface representing a payload of data that was received from an input. Messages contain multiple parts, where each part is a byte array. If an input supports only single part messages they will still be read as multipart messages with one part.

func FromBytes

func FromBytes(b []byte) (Message, error)

FromBytes deserialises a Message from a byte array.

func LockMessage added in v0.15.0

func LockMessage(msg Message, part int) Message

LockMessage wraps a message into a read only type that restricts access to only a single message part, accessible either by index 0 or -1.

func NewMessage

func NewMessage(parts [][]byte) Message

NewMessage initializes a new message.

type Output added in v0.9.0

type Output interface {
	Consumer
	Closable
}

Output is a closable consumer.

type Producer

type Producer interface {
	Transactor
}

Producer is the higher level producer type.

type Response

type Response interface {
	// Error returns a non-nil error if the message failed to reach its
	// destination.
	Error() error

	// SkipAck indicates that even though there may not have been an error in
	// processing a message, it should not be acknowledged. If SkipAck is false
	// and Error is nil then all unacknowledged messages should be acknowledged
	// also.
	SkipAck() bool
}

Response is a response from an output, agent or broker that confirms the input of successful message receipt.

type SimpleResponse

type SimpleResponse struct {
	// contains filtered or unexported fields
}

SimpleResponse is returned by an output or agent to provide a single return message.

func NewSimpleResponse

func NewSimpleResponse(err error) SimpleResponse

NewSimpleResponse returns a response with an error (nil error signals successful receipt).

func (SimpleResponse) Error

func (o SimpleResponse) Error() error

Error returns the underlying error.

func (SimpleResponse) SkipAck added in v0.3.1

func (o SimpleResponse) SkipAck() bool

SkipAck indicates whether a successful message should be acknowledged.

type Transaction added in v0.9.0

type Transaction struct {
	// Payload is the message payload of this transaction.
	Payload Message

	// ResponseChan should receive a response at the end of a transaction (once
	// the message is no longer owned by the receiver.) The response itself
	// indicates whether the message has been propagated successfully.
	ResponseChan chan<- Response
}

Transaction is a type respesenting a transaction containing a payload (the message) and a response channel, which is used to indicate whether the message was successfully propagated to the next destination.

func NewTransaction added in v0.9.0

func NewTransaction(payload Message, resChan chan<- Response) Transaction

NewTransaction creates a new transaction object from a message payload and a response channel.

type TransactionReceiver added in v0.9.0

type TransactionReceiver interface {
	// StartReceiving starts the type receiving transactions from a Transactor.
	StartReceiving(<-chan Transaction) error
}

TransactionReceiver is a type that receives transactions from a Transactor.

type Transactor added in v0.9.0

type Transactor interface {
	// TransactionChan returns a channel used for consuming transactions from
	// this type. Every transaction received must be resolved before another
	// transaction will be sent.
	TransactionChan() <-chan Transaction
}

Transactor is a type that sends messages and waits for a response back, the response indicates whether the message was successfully propagated to a new destination (and can be discarded from the source.)

type UnacknowledgedResponse added in v0.3.1

type UnacknowledgedResponse struct{}

UnacknowledgedResponse is a response type that indicates the message has reached its destination but should not be acknowledged.

func NewUnacknowledgedResponse added in v0.3.1

func NewUnacknowledgedResponse() UnacknowledgedResponse

NewUnacknowledgedResponse returns an UnacknowledgedResponse.

func (UnacknowledgedResponse) Error added in v0.3.1

func (u UnacknowledgedResponse) Error() error

Error returns the underlying error.

func (UnacknowledgedResponse) SkipAck added in v0.3.1

func (u UnacknowledgedResponse) SkipAck() bool

SkipAck indicates whether a successful message should be acknowledged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL