processor

package
v0.9.0-nightly.20231212 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSkipRecord is passed by a processor when it should Ack and skip a Record.
	// It must be separate from a plain error so that we continue instead of marking
	// the Pipeline status as degraded.
	ErrSkipRecord       = cerrors.New("record skipped")
	ErrInstanceNotFound = cerrors.New("processor instance not found")
)
View Source
var GlobalBuilderRegistry = NewBuilderRegistry()

GlobalBuilderRegistry is a global registry of processor builders. It should be treated as a read only variable.

Functions

This section is empty.

Types

type Builder

type Builder func(Config) (Interface, error)

Builder parses the config and if valid returns a processor, an error otherwise.

type BuilderRegistry

type BuilderRegistry struct {
	// contains filtered or unexported fields
}

BuilderRegistry is a registry for registering or looking up processor builders. The Register and Get methods are safe for concurrent use.

func NewBuilderRegistry

func NewBuilderRegistry() *BuilderRegistry

NewBuilderRegistry returns an empty *BuilderRegistry.

func (*BuilderRegistry) Get

func (r *BuilderRegistry) Get(procType string) (Builder, error)

Get returns the processor builder registered under the specified type. If no builder is registered under that type it returns an error.

func (*BuilderRegistry) MustGet

func (r *BuilderRegistry) MustGet(procType string) Builder

MustGet tries to get a builder and panics on error.

func (*BuilderRegistry) MustRegister

func (r *BuilderRegistry) MustRegister(procType string, b Builder)

MustRegister tries to register a builder and panics on error.

func (*BuilderRegistry) Register

func (r *BuilderRegistry) Register(procType string, b Builder) error

Register registers a processor builder under the specified type. If a builder is already registered under that type it returns an error.

type Config

type Config struct {
	Settings map[string]string
	Workers  int
}

Config holds configuration data for building a processor.

type Instance

type Instance struct {
	ID            string
	CreatedAt     time.Time
	UpdatedAt     time.Time
	ProvisionedBy ProvisionType

	Type      string
	Parent    Parent
	Config    Config
	Processor Interface
}

Instance represents a processor instance.

type Interface added in v0.3.0

type Interface interface {
	// Process runs the processor function on a record.
	Process(ctx context.Context, record record.Record) (record.Record, error)

	// InspectIn starts an inspection session for input records for this processor.
	InspectIn(ctx context.Context, id string) *inspector.Session
	// InspectOut starts an inspection session for output records for this processor.
	InspectOut(ctx context.Context, id string) *inspector.Session

	// Close closes this processor and releases any resources
	// which may have been used by it.
	Close()
}

Interface is the interface that represents a single message processor that can be executed on one record and manipulate it.

type Parent

type Parent struct {
	// ID of the parent.
	ID string
	// Type of the parent.
	Type ParentType
}

Parent represents the connection to the entity a processor is connected to.

type ParentType

type ParentType int

ParentType defines the parent type of processor.

const (
	ParentTypeConnector ParentType = iota + 1
	ParentTypePipeline
)

func (ParentType) String

func (i ParentType) String() string

type ProvisionType added in v0.3.0

type ProvisionType int

ProvisionType defines provisioning type

const (
	ProvisionTypeAPI ProvisionType = iota
	ProvisionTypeConfig
)

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(logger log.CtxLogger, db database.DB, registry *BuilderRegistry) *Service

NewService creates a new processor service.

func (*Service) Check added in v0.5.1

func (s *Service) Check(ctx context.Context) error

func (*Service) Create

func (s *Service) Create(
	ctx context.Context,
	id string,
	procType string,
	parent Parent,
	cfg Config,
	pt ProvisionType,
) (*Instance, error)

Create will create a new processor instance.

func (*Service) Delete

func (s *Service) Delete(ctx context.Context, id string) error

Delete removes a processor from the Service.

func (*Service) Get

func (s *Service) Get(_ context.Context, id string) (*Instance, error)

Get will return a single processor or an error.

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init fetches instances from the store.

func (*Service) List

func (s *Service) List(_ context.Context) map[string]*Instance

List returns all processors in the Service.

func (*Service) Update

func (s *Service) Update(ctx context.Context, id string, cfg Config) (*Instance, error)

Update will update a processor instance config.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles the persistence and fetching of processor instances.

func NewStore

func NewStore(db database.DB, registry *BuilderRegistry) *Store

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete deletes instance under the key id and returns nil on success, error otherwise.

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (*Instance, error)

Get will return the processor instance for a given id or an error.

func (*Store) GetAll

func (s *Store) GetAll(ctx context.Context) (map[string]*Instance, error)

GetAll returns all instances stored in the database.

func (*Store) Set

func (s *Store) Set(ctx context.Context, id string, instance *Instance) error

Set stores instance under the key id and returns nil on success, error otherwise.

Directories

Path Synopsis
Code generated by MockGen.
Code generated by MockGen.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL