flow

package
v0.0.27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const MessageVersion = 1

Variables

View Source
var (
	ErrPluginMisconfigured = errors.New("plugin misconfigured")
	ErrPluginFatal         = errors.New("fatal plugin error")
	ErrPluginTransient     = errors.New("transient plugin error")
	ErrPluginStop          = errors.New("not handling input, stop processing")

	ErrCarrierMisconfigured = errors.New("carrier misconfigured")
	ErrCarrierFatal         = errors.New("fatal carrier error")
	ErrCarrierTransient     = errors.New("transient carrier error")
)
View Source
var File_flow_file_data_proto protoreflect.FileDescriptor
View Source
var File_flow_wrapper_proto protoreflect.FileDescriptor

Functions

func Connect

func Connect(
	baseLogger journal.Logger,
	config *Config,
	ic InputCarrier,
	oc OutputCarrier,
	ec ErrorCarrier,
	pp PostProcessor,
	plugins []Plugin,
) error

Connect connects a series of plugins to the flow. It passes a channel to the input carrier, which it monitors for input. When a message is received, it unmarshals it into the meander flow struct and then passes the payload as the argument to the first plugin. It will pass the output of that plugin to the next and repeat that until either the last plugin returns or any plugin returns an error. It will then push the output to the output carrier, or, in case of an error it will push the original message to the error carrier.

Types

type Config

type Config struct {
	// RetryLimit -- The amount of times we'll retry before it gets branded a hard fault.
	// Set this to 0 to disable retrying.
	// Default: 10
	RetryLimit int
}

type ErrorCarrier

type ErrorCarrier interface {
	// ValidErrorCarrier returns an error if not a valid error carrier.
	ValidErrorCarrier() error
	// WriteErr writes a message to the err channel.
	WriteErr(message *MessageContext) error

	// WriteFail writes a message to the fail channel.
	WriteFail(message *MessageContext) error
}

ErrorCarrier is an interface for how errors will reach the flow.

type FileData added in v0.0.14

type FileData struct {
	Filename string `protobuf:"bytes,1,opt,name=filename,proto3" json:"filename,omitempty"`
	FileData []byte `protobuf:"bytes,2,opt,name=file_data,json=fileData,proto3" json:"file_data,omitempty"`
	MimeType string `protobuf:"bytes,3,opt,name=mime_type,json=mimeType,proto3" json:"mime_type,omitempty"`
	// contains filtered or unexported fields
}

func (*FileData) Descriptor deprecated added in v0.0.14

func (*FileData) Descriptor() ([]byte, []int)

Deprecated: Use FileData.ProtoReflect.Descriptor instead.

func (*FileData) GetFileData added in v0.0.14

func (x *FileData) GetFileData() []byte

func (*FileData) GetFilename added in v0.0.14

func (x *FileData) GetFilename() string

func (*FileData) GetMimeType added in v0.0.14

func (x *FileData) GetMimeType() string

func (*FileData) ProtoMessage added in v0.0.14

func (*FileData) ProtoMessage()

func (*FileData) ProtoReflect added in v0.0.14

func (x *FileData) ProtoReflect() protoreflect.Message

func (*FileData) Reset added in v0.0.14

func (x *FileData) Reset()

func (*FileData) String added in v0.0.14

func (x *FileData) String() string

type InputCarrier

type InputCarrier interface {
	// SetupInput takes a channel and will write input to the channel.
	// Returns error on failure
	SetupInput(inChan chan *MessageContext) error

	// AcknowledgeMessage acknowledges a message, marking it as processed.
	// If passed error is not nil, it signals a failed but processed message.
	// Returns error when acknowledgement fails.
	AcknowledgeMessage(message *MessageContext, err error) error

	// InternalError signals the input carrier that an internal error has happened.
	// This is mostly here for interactive carriers, for others this will be a noop.
	InternalError(message *MessageContext, err error) error
}

InputCarrier is an interface for how input from the flow the plugin.

type MessageContext added in v0.0.16

type MessageContext struct {
	LedgerID uuid.UUID
	// contains filtered or unexported fields
}

MessageContext contains the context of the message, this includes the actual raw message, the ID used inside the carrier used for bookkeeping, state control, a logger with tags, and a context object.

func MessageContextFromWrapperProto added in v0.0.22

func MessageContextFromWrapperProto(
	binProto []byte,
	l journal.Logger,
	options *MessageOptions,
) (*MessageContext, error)

MessageContextFromWrapperProto takes a binary wrapper protobuffer, unpacks it, populates a MessageContext with it, and returns it.

func NewMessageContext added in v0.0.22

func NewMessageContext(w *Wrapper, l journal.Logger, messageType string, options *MessageOptions) *MessageContext

NewMessageContext takes a wrapper, a logger, and a string for messageType, and then returns a full MessageContext structure, including a cloned wrapper for error handling.

func (*MessageContext) AddError added in v0.0.22

func (m *MessageContext) AddError(e error) int

AddError adds an error to the origWrapper and returns the error count.

func (*MessageContext) Context added in v0.0.16

func (m *MessageContext) Context() context.Context

func (*MessageContext) GetErrWrapper added in v0.0.22

func (m *MessageContext) GetErrWrapper() *Wrapper

GetErrWrapper returns the origWrapper.

func (*MessageContext) GetErrorProto added in v0.0.22

func (m *MessageContext) GetErrorProto() ([]byte, error)

GetErrorProto changes the origin on the original wrapper and returns the protobuffer.

func (*MessageContext) GetLedgerID added in v0.0.27

func (m *MessageContext) GetLedgerID() uuid.UUID

func (*MessageContext) GetMessageType added in v0.0.22

func (m *MessageContext) GetMessageType() string

GetMessageType returns the set message type.

func (*MessageContext) GetOutputProto added in v0.0.22

func (m *MessageContext) GetOutputProto() ([]byte, error)

GetOutputProto alters the wrapper's fields to be sent out as output, and returns the protobuffer.

func (*MessageContext) GetPayload added in v0.0.16

func (m *MessageContext) GetPayload() []byte

func (*MessageContext) GetPostProcessData added in v0.0.22

func (m *MessageContext) GetPostProcessData() map[string]string

func (*MessageContext) GetTracingID added in v0.0.16

func (m *MessageContext) GetTracingID() uuid.UUID

func (*MessageContext) GetWrapper added in v0.0.22

func (m *MessageContext) GetWrapper() *Wrapper

GetWrapper returns the wrapper.

func (*MessageContext) GetWrapperProto added in v0.0.22

func (m *MessageContext) GetWrapperProto() ([]byte, error)

GetWrapperProto returns a protobuf marshalled version of message wrapper.

func (*MessageContext) Logger added in v0.0.16

func (m *MessageContext) Logger() journal.Logger

func (*MessageContext) SetMessageType added in v0.0.22

func (m *MessageContext) SetMessageType(t string)

SetMessageType sets the message type.

func (*MessageContext) SetPayload added in v0.0.16

func (m *MessageContext) SetPayload(p []byte)

func (*MessageContext) SetPostProcessValue added in v0.0.22

func (m *MessageContext) SetPostProcessValue(key, value string)

func (*MessageContext) StopRequested added in v0.0.22

func (m *MessageContext) StopRequested() bool

StopRequested checks if a stop is requested.

type MessageOptions added in v0.0.22

type MessageOptions struct {
	// Stop will request a stop of the program.
	Stop bool
}

MessageOptions are options set for this message.

type OutputCarrier

type OutputCarrier interface {
	// WriteOutput writes a message to output
	WriteOutput(message *MessageContext) error
}

OutputCarrier is an interface for how output will be inserted in the flow.

type Plugin added in v0.0.5

type Plugin func(i *MessageContext) (err error)

Plugin is the definition of a transformation plugin function. It takes the tracing ID from the wrapper, and the bytes of the payload. It returns the output of the payload and any error it encountered.

type PostProcessor added in v0.0.16

type PostProcessor interface {
	// PostProcess is an additional "plugin" you can apply after handling output.
	PostProcess(outputError error, message *MessageContext) error
}

The PostProcessor is used for things like sending statistics or other things that are not for the continued flow of messages.

type ToggleState added in v0.0.14

type ToggleState int64
const (
	Noop ToggleState = iota
	Stop
)

type Wrapper

type Wrapper struct {
	CreationTime *timestamp.Timestamp `protobuf:"bytes,1,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"`
	Uuid         string               `protobuf:"bytes,2,opt,name=uuid,proto3" json:"uuid,omitempty"`
	Version      uint32               `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
	Errors       []string             `protobuf:"bytes,4,rep,name=errors,proto3" json:"errors,omitempty"`
	Origin       string               `protobuf:"bytes,5,opt,name=origin,proto3" json:"origin,omitempty"`
	Type         string               `protobuf:"bytes,6,opt,name=type,proto3" json:"type,omitempty"`
	Payload      []byte               `protobuf:"bytes,7,opt,name=payload,proto3" json:"payload,omitempty"`
	// contains filtered or unexported fields
}

func NewWrapper added in v0.0.22

func NewWrapper(contents []byte, msgType, msgOrigin string) *Wrapper

NewWrapper returns contents wrapped, with the payload, type, and origin set as defined, but with a newly generated UUID.

func NewWrapperWithTracingID added in v0.0.22

func NewWrapperWithTracingID(contents []byte, msgType, msgOrigin string, tracingID uuid.UUID) *Wrapper

NewWrapperWithTracingID returns a new wrapper, with the payload, type, origin, and ID set as defined.

func (*Wrapper) Descriptor deprecated

func (*Wrapper) Descriptor() ([]byte, []int)

Deprecated: Use Wrapper.ProtoReflect.Descriptor instead.

func (*Wrapper) GetCreationTime

func (x *Wrapper) GetCreationTime() *timestamp.Timestamp

func (*Wrapper) GetErrors

func (x *Wrapper) GetErrors() []string

func (*Wrapper) GetID added in v0.0.22

func (w *Wrapper) GetID() uuid.UUID

GetID gets the tracing ID from the wrapper.

func (*Wrapper) GetOrigin

func (x *Wrapper) GetOrigin() string

func (*Wrapper) GetPayload

func (x *Wrapper) GetPayload() []byte

func (*Wrapper) GetType

func (x *Wrapper) GetType() string

func (*Wrapper) GetUuid

func (x *Wrapper) GetUuid() string

func (*Wrapper) GetVersion

func (x *Wrapper) GetVersion() uint32

func (*Wrapper) ProtoMessage

func (*Wrapper) ProtoMessage()

func (*Wrapper) ProtoReflect

func (x *Wrapper) ProtoReflect() protoreflect.Message

func (*Wrapper) Reset

func (x *Wrapper) Reset()

func (*Wrapper) String

func (x *Wrapper) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL