processor

package
v0.3.0-nightly.20220601 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSkipRecord is passed by a Processor when it should Ack and skip a Record.
	// It must be separate from a plain error so that we continue instead of marking
	// the Pipeline status as degraded.
	ErrSkipRecord       = cerrors.New("record skipped")
	ErrInstanceNotFound = cerrors.New("processor instance not found")
)
View Source
var GlobalBuilderRegistry = NewBuilderRegistry()

GlobalBuilderRegistry is a global registry of processor builders. It should be treated as a read only variable.

Functions

This section is empty.

Types

type Builder

type Builder func(Config) (Processor, error)

Builder parses the config and if valid returns a processor, an error otherwise.

type BuilderRegistry

type BuilderRegistry struct {
	// contains filtered or unexported fields
}

BuilderRegistry is a registry for registering or looking up processor builders. The Register and Get methods are safe for concurrent use.

func NewBuilderRegistry

func NewBuilderRegistry() *BuilderRegistry

NewBuilderRegistry returns an empty *BuilderRegistry.

func (*BuilderRegistry) Get

func (r *BuilderRegistry) Get(name string) (Builder, error)

Get returns the processor builder registered under the specified name. If no builder is registered under that name it returns an error.

func (*BuilderRegistry) MustGet

func (r *BuilderRegistry) MustGet(name string) Builder

MustGet tries to get a builder and panics on error.

func (*BuilderRegistry) MustRegister

func (r *BuilderRegistry) MustRegister(name string, b Builder)

MustRegister tries to register a builder and panics on error.

func (*BuilderRegistry) Register

func (r *BuilderRegistry) Register(name string, b Builder) error

Register registers a processor builder under the specified name. If a builder is already registered under that name it returns an error.

type Config

type Config struct {
	Settings map[string]string
}

Config holds configuration data for building a processor.

type Instance

type Instance struct {
	ID        string
	CreatedAt time.Time
	UpdatedAt time.Time
	// Name is the name of the processor under which it is registered in the
	// builder registry.
	Name      string
	Parent    Parent
	Config    Config
	Processor Processor
}

Instance represents a processor instance.

type Parent

type Parent struct {
	// ID of the parent.
	ID string
	// Type of the parent.
	Type ParentType
}

Parent represents the connection to the entity a processor is connected to.

type ParentType

type ParentType int

ParentType defines the parent type of a processor.

const (
	ParentTypeConnector ParentType = iota + 1
	ParentTypePipeline
)

func (ParentType) String

func (i ParentType) String() string

type Processor

type Processor interface {
	// Type returns the processor type.
	Type() Type
	// Execute runs the processor function on a record.
	Execute(ctx context.Context, record record.Record) (record.Record, error)
}

Processor is the interface that represents a single message processor that can be executed on one record and manipulate it.

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(logger log.CtxLogger, db database.DB, registry *BuilderRegistry) *Service

NewService creates a new processor service.

func (*Service) Create

func (s *Service) Create(
	ctx context.Context,
	id string,
	name string,
	t Type,
	parent Parent,
	cfg Config,
) (*Instance, error)

Create will create a new processor instance.

func (*Service) Delete

func (s *Service) Delete(ctx context.Context, id string) error

Delete removes a processor from the Service.

func (*Service) Get

func (s *Service) Get(ctx context.Context, id string) (*Instance, error)

Get will return a single processor or an error.

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init fetches instances from the store.

func (*Service) List

func (s *Service) List(ctx context.Context) map[string]*Instance

List returns all processors in the Service.

func (*Service) Update

func (s *Service) Update(ctx context.Context, id string, cfg Config) (*Instance, error)

Update will update a processor instance config.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles the persistence and fetching of processor instances.

func NewStore

func NewStore(db database.DB, registry *BuilderRegistry) *Store

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete deletes instance under the key id and returns nil on success, error otherwise.

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (*Instance, error)

Get will return the processor instance for a given id or an error.

func (*Store) GetAll

func (s *Store) GetAll(ctx context.Context) (map[string]*Instance, error)

GetAll returns all instances stored in the database.

func (*Store) Set

func (s *Store) Set(ctx context.Context, id string, instance *Instance) error

Set stores instance under the key id and returns nil on success, error otherwise.

type Type

type Type int

Type defines the processor type.

const (
	TypeTransform Type = iota + 1
	TypeFilter
)

func (Type) String

func (i Type) String() string

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL