types

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2017 License: MIT Imports: 3 Imported by: 45

Documentation

Overview

Package types - Defines any general structs and interfaces used throughout the benthos code base.

Benthos uses abstract types to represent arbitrary producers and consumers of data to its core components. This allows us to construct types for piping data in various arrangements without regard for the specific destinations and sources of our data.

The basic principle behind a producer/consumer relationship is that a producer pipes data to the consumer in lock-step, where for each message sent it will expect a response that confirms the message was received and propagated onwards.

Messages and responses are sent via channels, and in order to instigate this pairing each type is expected to create and maintain ownership of its respective sending channel.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout    = errors.New("action timed out")
	ErrChanClosed = errors.New("channel was closed unexpectedly")
	ErrTypeClosed = errors.New("type was closed")

	ErrInvalidBufferType = errors.New("buffer type was not recognised")
	ErrInvalidInputType  = errors.New("input type was not recognised")
	ErrInvalidOutputType = errors.New("output type was not recognised")

	ErrInvalidZMQType        = errors.New("invalid ZMQ socket type")
	ErrInvalidScaleProtoType = errors.New("invalid Scalability Protocols socket type")

	// ErrAlreadyStarted - When an input or output type gets started a second time.
	ErrAlreadyStarted = errors.New("type has already been started")

	ErrBadMessageBytes = errors.New("serialised message bytes were in unexpected format")
	ErrBlockCorrupted  = errors.New("serialised messages block was in unexpected format")
)

Errors used throughout the codebase

View Source
var (
	ErrMessageTooLarge = errors.New("message body larger than buffer space")
)

Buffer errors

Functions

This section is empty.

Types

type Closable

type Closable interface {
	// CloseAsync - Trigger a closure of this object but do not block until completion.
	CloseAsync()

	// WaitForClose - A blocking call to wait until the object has finished closing down and
	// cleaning up resources.
	WaitForClose(timeout time.Duration) error
}

Closable - Defines a type that can be safely closed down and cleaned up.

type Consumer

type Consumer interface {
	MessageReceiver
	Responder
}

Consumer - The higher level consumer type.

type ErrUnexpectedHTTPRes

type ErrUnexpectedHTTPRes struct {
	Code int
	S    string
}

ErrUnexpectedHTTPRes - Error returned when an HTTP request returned an unexpected response.

func (ErrUnexpectedHTTPRes) Error

func (e ErrUnexpectedHTTPRes) Error() string

Error - Returns the Error string.

type HTTPMessage

type HTTPMessage struct {
	Parts []string `json:"parts"`
}

HTTPMessage - A struct containing a benthos message, to be sent over the wire in an HTTP request. Message parts are sent as JSON strings.

type HTTPResponse

type HTTPResponse struct {
	Error string `json:"error"`
}

HTTPResponse - A struct expected as an HTTP response after sending a message. The intention is to confirm that the message has been received successfully.

type MappedResponse

type MappedResponse struct {
	Errors map[int]error
}

MappedResponse - Returned by a broker to provide a map of errors representing agent errors.

func NewMappedResponse

func NewMappedResponse() MappedResponse

NewMappedResponse - Returns a response tailored for a broker (with n agents).

func (MappedResponse) Error

func (b MappedResponse) Error() error

Error - Returns nil if no errors are present, otherwise a concatenated blob of errors.

func (MappedResponse) ErrorMap

func (b MappedResponse) ErrorMap() map[int]error

ErrorMap - Returns a map of errors returned by agents, represented by index.

type Message

type Message struct {
	Parts [][]byte `json:"parts"`
}

Message - A struct containing any relevant fields of a benthos message and helper functions.

func FromBytes

func FromBytes(b []byte) (Message, error)

FromBytes - Deserialise a Message from a byte array.

func NewMessage

func NewMessage() Message

NewMessage - Initializes an empty message.

func (*Message) Bytes

func (m *Message) Bytes() []byte

Bytes - Serialise the message into a single byte array.

type MessageReceiver

type MessageReceiver interface {
	// StartReceiving - Starts the type receiving messages from a channel.
	StartReceiving(<-chan Message) error
}

MessageReceiver - A type that receives messages from an input.

type MessageSender

type MessageSender interface {
	// MessageChan - Returns the channel used for consuming messages from this input.
	MessageChan() <-chan Message
}

MessageSender - A type that sends messages to an output.

type Producer

type Producer interface {
	MessageSender
	ResponderListener
}

Producer - The higher level producer type.

type Responder

type Responder interface {
	// ResponseChan - Returns a response for every input message received.
	ResponseChan() <-chan Response
}

Responder - Defines a type that will send a response every time a message is received.

type ResponderListener

type ResponderListener interface {
	// StartListening - Starts the type listening to a channel.
	StartListening(<-chan Response) error
}

ResponderListener - A type that listens to a Responder type.

type Response

type Response interface {
	Error() error
	ErrorMap() map[int]error
}

Response - A response from an output, agent or broker that confirms the input of successful message receipt.

type SimpleResponse

type SimpleResponse struct {
	// contains filtered or unexported fields
}

SimpleResponse - Returned by an output or agent to provide a single return message.

func NewSimpleResponse

func NewSimpleResponse(err error) SimpleResponse

NewSimpleResponse - Returns a response with an error (nil error signals successful receipt).

func (SimpleResponse) Error

func (o SimpleResponse) Error() error

Error - Returns the underlying error.

func (SimpleResponse) ErrorMap

func (o SimpleResponse) ErrorMap() map[int]error

ErrorMap - Returns nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL