pipeline

package
v0.7.0-nightly.20230413 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout             = cerrors.New("operation timed out")
	ErrGracefulShutdown    = cerrors.New("graceful shutdown")
	ErrPipelineRunning     = cerrors.New("pipeline is running")
	ErrPipelineNotRunning  = cerrors.New("pipeline not running")
	ErrInstanceNotFound    = cerrors.New("pipeline instance not found")
	ErrNameMissing         = cerrors.New("must provide a pipeline name")
	ErrNameAlreadyExists   = cerrors.New("pipeline name already exists")
	ErrConnectorIDNotFound = cerrors.New("connector ID not found")
	ErrProcessorIDNotFound = cerrors.New("processor ID not found")
)
View Source
var DefaultDLQ = DLQ{
	Plugin: "builtin:log",
	Settings: map[string]string{
		"level":   "warn",
		"message": "record delivery failed",
	},
	WindowSize:          1,
	WindowNackThreshold: 0,
}

Functions

This section is empty.

Types

type Config

type Config struct {
	Name        string
	Description string
}

Config holds configuration data for building a pipeline.

type ConnectorFetcher

type ConnectorFetcher interface {
	Get(ctx context.Context, id string) (*connector.Instance, error)
	Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, cfg connector.Config, p connector.ProvisionType) (*connector.Instance, error)
}

ConnectorFetcher can fetch a connector instance.

type DLQ added in v0.4.0

type DLQ struct {
	Plugin   string
	Settings map[string]string

	WindowSize          int
	WindowNackThreshold int
}

type DLQDestination added in v0.4.0

type DLQDestination struct {
	Destination stream.Destination
	Logger      log.CtxLogger
	// contains filtered or unexported fields
}

DLQDestination is a DLQ handler that forwards DLQ records to a destination connector.

func (*DLQDestination) Close added in v0.4.0

func (d *DLQDestination) Close(ctx context.Context) (err error)

Close stops the destination and tears it down.

func (*DLQDestination) Open added in v0.4.0

func (d *DLQDestination) Open(ctx context.Context) error

func (*DLQDestination) Write added in v0.4.0

func (d *DLQDestination) Write(ctx context.Context, rec record.Record) error

Write writes the record synchronously to the destination, meaning that it waits until an ack is received for the record before it returns. If the record write fails or the destination returns a nack, the function returns an error.

type FailureEvent added in v0.6.1

type FailureEvent struct {
	// ID is the ID of the pipeline which failed.
	ID    string
	Error error
}

type FailureHandler added in v0.6.1

type FailureHandler func(FailureEvent)

type Instance

type Instance struct {
	ID            string
	Config        Config
	Status        Status
	Error         string
	CreatedAt     time.Time
	UpdatedAt     time.Time
	ProvisionedBy ProvisionType
	DLQ           DLQ

	ConnectorIDs []string
	ProcessorIDs []string
	// contains filtered or unexported fields
}

Instance manages a collection of Connectors, which can be either Destination or Source. The pipeline sets up its publishers and subscribers based on whether the Connector in question is a Destination or a Source.

func (*Instance) Wait

func (p *Instance) Wait() error

type PluginDispenserFetcher added in v0.5.0

type PluginDispenserFetcher interface {
	NewDispenser(logger log.CtxLogger, name string) (plugin.Dispenser, error)
}

PluginDispenserFetcher can fetch a plugin.

type ProcessorFetcher

type ProcessorFetcher interface {
	Get(ctx context.Context, id string) (*processor.Instance, error)
}

ProcessorFetcher can fetch a processor instance.

type ProvisionType added in v0.3.0

type ProvisionType int

ProvisionType defines provisioning type

const (
	ProvisionTypeAPI ProvisionType = iota
	ProvisionTypeConfig
)

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages pipelines.

func NewService

func NewService(logger log.CtxLogger, db database.DB) *Service

NewService initializes and returns a pipeline Service.

func (*Service) AddConnector

func (s *Service) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)

AddConnector adds a connector to a pipeline.

func (*Service) AddProcessor

func (s *Service) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)

AddProcessor adds a processor to a pipeline.

func (*Service) Check added in v0.5.1

func (s *Service) Check(ctx context.Context) error

func (*Service) Create

func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error)

Create will create a new pipeline instance with the given config and return it if it was successfully saved to the database.

func (*Service) Delete

func (s *Service) Delete(ctx context.Context, pipelineID string) error

Delete removes a pipeline instance from the Service.

func (*Service) Get

func (s *Service) Get(_ context.Context, id string) (*Instance, error)

Get will return a single pipeline instance or an error.

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init fetches instances from the store without running any. Connectors and processors should be initialized before calling this function.

func (*Service) List

func (s *Service) List(context.Context) map[string]*Instance

List returns all pipeline instances in the Service.

func (*Service) OnFailure added in v0.6.1

func (s *Service) OnFailure(handler FailureHandler)

OnFailure registers a handler for a pipeline.FailureEvent. Only errors which happen after a pipeline has been started are being sent.

func (*Service) RemoveConnector

func (s *Service) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)

RemoveConnector removes a connector from a pipeline.

func (*Service) RemoveProcessor

func (s *Service) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)

RemoveProcessor removes a processor from a pipeline.

func (*Service) Run added in v0.4.0

func (s *Service) Run(
	ctx context.Context,
	connFetcher ConnectorFetcher,
	procFetcher ProcessorFetcher,
	pluginFetcher PluginDispenserFetcher,
) error

Run runs pipelines that had the running state in store.

func (*Service) Start

func (s *Service) Start(
	ctx context.Context,
	connFetcher ConnectorFetcher,
	procFetcher ProcessorFetcher,
	pluginFetcher PluginDispenserFetcher,
	pipelineID string,
) error

Start builds and starts a pipeline with the given ID. If the pipeline is already running, Start returns ErrPipelineRunning.

func (*Service) Stop

func (s *Service) Stop(ctx context.Context, pipelineID string, force bool) error

Stop will attempt to gracefully stop a given pipeline by calling each node's Stop function. If force is set to true the pipeline won't stop gracefully, instead the context for all nodes will be canceled which causes them to stop running as soon as possible.

func (*Service) StopAll

func (s *Service) StopAll(ctx context.Context, reason error)

StopAll will ask all the pipelines to stop gracefully (i.e. that existing messages get processed but not new messages get produced).

func (*Service) Update

func (s *Service) Update(ctx context.Context, pipelineID string, cfg Config) (*Instance, error)

Update will update a pipeline instance config.

func (*Service) UpdateDLQ added in v0.4.0

func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*Instance, error)

UpdateDLQ will update a pipeline DLQ config.

func (*Service) Wait

func (s *Service) Wait(timeout time.Duration) error

Wait blocks until all pipelines are stopped or until the timeout is reached. Returns:

(1) nil if all the pipelines are gracefully stopped,

(2) an error, if the pipelines could not have been gracefully stopped,

(3) ErrTimeout if the pipelines were not stopped within the given timeout.

type Status

type Status int

Status defines the running status of a pipeline.

const (
	StatusRunning Status = iota + 1
	StatusSystemStopped
	StatusUserStopped
	StatusDegraded
)

func (Status) String

func (i Status) String() string

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles the persistence and fetching of pipeline instances.

func NewStore

func NewStore(db database.DB) *Store

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete deletes instance under the key id and returns nil on success, error otherwise.

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (*Instance, error)

Get will return the pipeline instance for a given id or an error.

func (*Store) GetAll

func (s *Store) GetAll(ctx context.Context) (map[string]*Instance, error)

GetAll returns all instances stored in the database.

func (*Store) Set

func (s *Store) Set(ctx context.Context, id string, instance *Instance) error

Set stores instance under the key id and returns nil on success, error otherwise.

Directories

Path Synopsis
Package stream defines a message and nodes that can be composed into a data pipeline.
Package stream defines a message and nodes that can be composed into a data pipeline.
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL