processor

package
v0.12.0-nightly.20240906 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInstanceNotFound = cerrors.New("processor instance not found")
	ErrProcessorRunning = cerrors.New("processor already running")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Settings map[string]string
	Workers  int
}

Config holds configuration data for building a processor.

type Instance

type Instance struct {
	ID            string
	CreatedAt     time.Time
	UpdatedAt     time.Time
	ProvisionedBy ProvisionType

	Plugin string
	// Condition is a goTemplate formatted string, the value provided to the template is a sdk.Record, it should evaluate
	// to a boolean value, indicating a condition to run the processor for a specific record or not. (template functions
	// provided by `sprig` are injected)
	Condition string
	Parent    Parent
	Config    Config
	// contains filtered or unexported fields
}

Instance represents a processor persisted in a database. An Instance is used to create a RunnableProcessor which represents a processor which can be used in a pipeline.

func (*Instance) Close added in v0.9.0

func (i *Instance) Close()

func (*Instance) InspectIn added in v0.9.0

func (i *Instance) InspectIn(ctx context.Context, id string) *inspector.Session

func (*Instance) InspectOut added in v0.9.0

func (i *Instance) InspectOut(ctx context.Context, id string) *inspector.Session

type Parent

type Parent struct {
	// ID of the parent.
	ID string
	// Type of the parent.
	Type ParentType
}

Parent represents the connection to the entity a processor is connected to.

type ParentType

type ParentType int

ParentType defines the parent type of processor.

const (
	ParentTypeConnector ParentType = iota + 1
	ParentTypePipeline
)

func (ParentType) String

func (i ParentType) String() string

type PluginService added in v0.9.0

type PluginService interface {
	NewProcessor(ctx context.Context, pluginName string, id string) (sdk.Processor, error)
}

type ProvisionType added in v0.3.0

type ProvisionType int

ProvisionType defines provisioning type

const (
	ProvisionTypeAPI ProvisionType = iota
	ProvisionTypeConfig
)

type RunnableProcessor added in v0.9.0

type RunnableProcessor struct {
	*Instance
	// contains filtered or unexported fields
}

RunnableProcessor is a stream.Processor which has been initialized and is ready to be used in a pipeline.

func (*RunnableProcessor) Open added in v0.9.0

func (p *RunnableProcessor) Open(ctx context.Context) error

func (*RunnableProcessor) Process added in v0.9.0

func (p *RunnableProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*RunnableProcessor) Teardown added in v0.9.0

func (p *RunnableProcessor) Teardown(ctx context.Context) error

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(logger log.CtxLogger, db database.DB, registry PluginService) *Service

NewService creates a new processor plugin service.

func (*Service) Check added in v0.5.1

func (s *Service) Check(ctx context.Context) error

func (*Service) Create

func (s *Service) Create(
	ctx context.Context,
	id string,
	plugin string,
	parent Parent,
	cfg Config,
	pt ProvisionType,
	cond string,
) (*Instance, error)

Create will create a new processor instance.

func (*Service) Delete

func (s *Service) Delete(ctx context.Context, id string) error

Delete removes a processor from the Service.

func (*Service) Get

func (s *Service) Get(_ context.Context, id string) (*Instance, error)

Get will return a single processor or an error.

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init fetches instances from the store.

func (*Service) List

func (s *Service) List(_ context.Context) map[string]*Instance

List returns all processors in the Service.

func (*Service) MakeRunnableProcessor added in v0.9.0

func (s *Service) MakeRunnableProcessor(ctx context.Context, i *Instance) (*RunnableProcessor, error)

func (*Service) Update

func (s *Service) Update(ctx context.Context, id string, cfg Config) (*Instance, error)

Update will update a processor instance config.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles the persistence and fetching of processor instances.

func NewStore

func NewStore(db database.DB) *Store

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete deletes instance under the key id and returns nil on success, error otherwise.

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (*Instance, error)

Get will return the processor instance for a given id or an error.

func (*Store) GetAll

func (s *Store) GetAll(ctx context.Context) (map[string]*Instance, error)

GetAll returns all instances stored in the database.

func (*Store) Set

func (s *Store) Set(ctx context.Context, id string, instance *Instance) error

Set stores instance under the key id and returns nil on success, error otherwise.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL