Documentation ¶
Index ¶
- Constants
- Variables
- func Connect(baseLogger journal.Logger, config *Config, ic InputCarrier, oc OutputCarrier, ...) error
- type Config
- type ErrorCarrier
- type FileData
- func (*FileData) Descriptor() ([]byte, []int)deprecated
- func (x *FileData) GetFileData() []byte
- func (x *FileData) GetFilename() string
- func (x *FileData) GetMimeType() string
- func (*FileData) ProtoMessage()
- func (x *FileData) ProtoReflect() protoreflect.Message
- func (x *FileData) Reset()
- func (x *FileData) String() string
- type InputCarrier
- type MessageContext
- func (m *MessageContext) AddError(e error) int
- func (m *MessageContext) Context() context.Context
- func (m *MessageContext) GetErrWrapper() *Wrapper
- func (m *MessageContext) GetErrorProto() ([]byte, error)
- func (m *MessageContext) GetLedgerID() uuid.UUID
- func (m *MessageContext) GetMessageType() string
- func (m *MessageContext) GetOutputProto() ([]byte, error)
- func (m *MessageContext) GetPayload() []byte
- func (m *MessageContext) GetPostProcessData() map[string]string
- func (m *MessageContext) GetTracingID() uuid.UUID
- func (m *MessageContext) GetWrapper() *Wrapper
- func (m *MessageContext) GetWrapperProto() ([]byte, error)
- func (m *MessageContext) Logger() journal.Logger
- func (m *MessageContext) SetMessageType(t string)
- func (m *MessageContext) SetPayload(p []byte)
- func (m *MessageContext) SetPostProcessValue(key, value string)
- func (m *MessageContext) StopRequested() bool
- type MessageOptions
- type OutputCarrier
- type Plugin
- type PostProcessor
- type ToggleState
- type Wrapper
- func (*Wrapper) Descriptor() ([]byte, []int)deprecated
- func (x *Wrapper) GetCreationTime() *timestamp.Timestamp
- func (x *Wrapper) GetErrors() []string
- func (w *Wrapper) GetID() uuid.UUID
- func (x *Wrapper) GetOrigin() string
- func (x *Wrapper) GetPayload() []byte
- func (x *Wrapper) GetType() string
- func (x *Wrapper) GetUuid() string
- func (x *Wrapper) GetVersion() uint32
- func (*Wrapper) ProtoMessage()
- func (x *Wrapper) ProtoReflect() protoreflect.Message
- func (x *Wrapper) Reset()
- func (x *Wrapper) String() string
Constants ¶
const MessageVersion = 1
Variables ¶
var ( ErrPluginMisconfigured = errors.New("plugin misconfigured") ErrPluginFatal = errors.New("fatal plugin error") ErrPluginTransient = errors.New("transient plugin error") ErrPluginStop = errors.New("not handling input, stop processing") ErrCarrierMisconfigured = errors.New("carrier misconfigured") ErrCarrierFatal = errors.New("fatal carrier error") ErrCarrierTransient = errors.New("transient carrier error") )
var File_flow_file_data_proto protoreflect.FileDescriptor
var File_flow_wrapper_proto protoreflect.FileDescriptor
Functions ¶
func Connect ¶
func Connect( baseLogger journal.Logger, config *Config, ic InputCarrier, oc OutputCarrier, ec ErrorCarrier, pp PostProcessor, plugins []Plugin, ) error
Connect connects a series of plugins to the flow. It passes a channel to the input carrier, which it monitors for input. When a message is received, it unmarshals it into the meander flow struct and then passes the payload as the argument to the first plugin. It will pass the output of that plugin to the next and repeat that until either the last plugin returns or any plugin returns an error. It will then push the output to the output carrier, or, in case of an error it will push the original message to the error carrier.
Types ¶
type Config ¶
type Config struct { // RetryLimit -- The amount of times we'll retry before it gets branded a hard fault. // Set this to 0 to disable retrying. // Default: 10 RetryLimit int }
type ErrorCarrier ¶
type ErrorCarrier interface { // ValidErrorCarrier returns an error if not a valid error carrier. ValidErrorCarrier() error // WriteErr writes a message to the err channel. WriteErr(message *MessageContext) error // WriteFail writes a message to the fail channel. WriteFail(message *MessageContext) error }
ErrorCarrier is an interface for how errors will reach the flow.
type FileData ¶ added in v0.0.14
type FileData struct { Filename string `protobuf:"bytes,1,opt,name=filename,proto3" json:"filename,omitempty"` FileData []byte `protobuf:"bytes,2,opt,name=file_data,json=fileData,proto3" json:"file_data,omitempty"` MimeType string `protobuf:"bytes,3,opt,name=mime_type,json=mimeType,proto3" json:"mime_type,omitempty"` // contains filtered or unexported fields }
func (*FileData) Descriptor
deprecated
added in
v0.0.14
func (*FileData) GetFileData ¶ added in v0.0.14
func (*FileData) GetFilename ¶ added in v0.0.14
func (*FileData) GetMimeType ¶ added in v0.0.14
func (*FileData) ProtoMessage ¶ added in v0.0.14
func (*FileData) ProtoMessage()
func (*FileData) ProtoReflect ¶ added in v0.0.14
func (x *FileData) ProtoReflect() protoreflect.Message
type InputCarrier ¶
type InputCarrier interface { // SetupInput takes a channel and will write input to the channel. // Returns error on failure SetupInput(inChan chan *MessageContext) error // AcknowledgeMessage acknowledges a message, marking it as processed. // If passed error is not nil, it signals a failed but processed message. // Returns error when acknowledgement fails. AcknowledgeMessage(message *MessageContext, err error) error // InternalError signals the input carrier that an internal error has happened. // This is mostly here for interactive carriers, for others this will be a noop. InternalError(message *MessageContext, err error) error }
InputCarrier is an interface for how input from the flow the plugin.
type MessageContext ¶ added in v0.0.16
MessageContext contains the context of the message, this includes the actual raw message, the ID used inside the carrier used for bookkeeping, state control, a logger with tags, and a context object.
func MessageContextFromWrapperProto ¶ added in v0.0.22
func MessageContextFromWrapperProto( binProto []byte, l journal.Logger, options *MessageOptions, ) (*MessageContext, error)
MessageContextFromWrapperProto takes a binary wrapper protobuffer, unpacks it, populates a MessageContext with it, and returns it.
func NewMessageContext ¶ added in v0.0.22
func NewMessageContext(w *Wrapper, l journal.Logger, messageType string, options *MessageOptions) *MessageContext
NewMessageContext takes a wrapper, a logger, and a string for messageType, and then returns a full MessageContext structure, including a cloned wrapper for error handling.
func (*MessageContext) AddError ¶ added in v0.0.22
func (m *MessageContext) AddError(e error) int
AddError adds an error to the origWrapper and returns the error count.
func (*MessageContext) Context ¶ added in v0.0.16
func (m *MessageContext) Context() context.Context
func (*MessageContext) GetErrWrapper ¶ added in v0.0.22
func (m *MessageContext) GetErrWrapper() *Wrapper
GetErrWrapper returns the origWrapper.
func (*MessageContext) GetErrorProto ¶ added in v0.0.22
func (m *MessageContext) GetErrorProto() ([]byte, error)
GetErrorProto changes the origin on the original wrapper and returns the protobuffer.
func (*MessageContext) GetLedgerID ¶ added in v0.0.27
func (m *MessageContext) GetLedgerID() uuid.UUID
func (*MessageContext) GetMessageType ¶ added in v0.0.22
func (m *MessageContext) GetMessageType() string
GetMessageType returns the set message type.
func (*MessageContext) GetOutputProto ¶ added in v0.0.22
func (m *MessageContext) GetOutputProto() ([]byte, error)
GetOutputProto alters the wrapper's fields to be sent out as output, and returns the protobuffer.
func (*MessageContext) GetPayload ¶ added in v0.0.16
func (m *MessageContext) GetPayload() []byte
func (*MessageContext) GetPostProcessData ¶ added in v0.0.22
func (m *MessageContext) GetPostProcessData() map[string]string
func (*MessageContext) GetTracingID ¶ added in v0.0.16
func (m *MessageContext) GetTracingID() uuid.UUID
func (*MessageContext) GetWrapper ¶ added in v0.0.22
func (m *MessageContext) GetWrapper() *Wrapper
GetWrapper returns the wrapper.
func (*MessageContext) GetWrapperProto ¶ added in v0.0.22
func (m *MessageContext) GetWrapperProto() ([]byte, error)
GetWrapperProto returns a protobuf marshalled version of message wrapper.
func (*MessageContext) Logger ¶ added in v0.0.16
func (m *MessageContext) Logger() journal.Logger
func (*MessageContext) SetMessageType ¶ added in v0.0.22
func (m *MessageContext) SetMessageType(t string)
SetMessageType sets the message type.
func (*MessageContext) SetPayload ¶ added in v0.0.16
func (m *MessageContext) SetPayload(p []byte)
func (*MessageContext) SetPostProcessValue ¶ added in v0.0.22
func (m *MessageContext) SetPostProcessValue(key, value string)
func (*MessageContext) StopRequested ¶ added in v0.0.22
func (m *MessageContext) StopRequested() bool
StopRequested checks if a stop is requested.
type MessageOptions ¶ added in v0.0.22
type MessageOptions struct { // Stop will request a stop of the program. Stop bool }
MessageOptions are options set for this message.
type OutputCarrier ¶
type OutputCarrier interface { // WriteOutput writes a message to output WriteOutput(message *MessageContext) error }
OutputCarrier is an interface for how output will be inserted in the flow.
type Plugin ¶ added in v0.0.5
type Plugin func(i *MessageContext) (err error)
Plugin is the definition of a transformation plugin function. It takes the tracing ID from the wrapper, and the bytes of the payload. It returns the output of the payload and any error it encountered.
type PostProcessor ¶ added in v0.0.16
type PostProcessor interface { // PostProcess is an additional "plugin" you can apply after handling output. PostProcess(outputError error, message *MessageContext) error }
The PostProcessor is used for things like sending statistics or other things that are not for the continued flow of messages.
type Wrapper ¶
type Wrapper struct { CreationTime *timestamp.Timestamp `protobuf:"bytes,1,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"` Uuid string `protobuf:"bytes,2,opt,name=uuid,proto3" json:"uuid,omitempty"` Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` Errors []string `protobuf:"bytes,4,rep,name=errors,proto3" json:"errors,omitempty"` Origin string `protobuf:"bytes,5,opt,name=origin,proto3" json:"origin,omitempty"` Type string `protobuf:"bytes,6,opt,name=type,proto3" json:"type,omitempty"` Payload []byte `protobuf:"bytes,7,opt,name=payload,proto3" json:"payload,omitempty"` // contains filtered or unexported fields }
func NewWrapper ¶ added in v0.0.22
NewWrapper returns contents wrapped, with the payload, type, and origin set as defined, but with a newly generated UUID.
func NewWrapperWithTracingID ¶ added in v0.0.22
func NewWrapperWithTracingID(contents []byte, msgType, msgOrigin string, tracingID uuid.UUID) *Wrapper
NewWrapperWithTracingID returns a new wrapper, with the payload, type, origin, and ID set as defined.
func (*Wrapper) Descriptor
deprecated
func (*Wrapper) GetCreationTime ¶
func (*Wrapper) GetPayload ¶
func (*Wrapper) GetVersion ¶
func (*Wrapper) ProtoMessage ¶
func (*Wrapper) ProtoMessage()
func (*Wrapper) ProtoReflect ¶
func (x *Wrapper) ProtoReflect() protoreflect.Message