lifecycle

package
v0.12.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package lifecycle contains the logic to manage the lifecycle of pipelines. It is responsible for starting, stopping and managing pipelines.

Index

Constants

View Source
const InfiniteRetriesErrRecovery = -1

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnectorPluginService

type ConnectorPluginService interface {
	NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error)
}

ConnectorPluginService can create a connector plugin dispenser.

type ConnectorService

type ConnectorService interface {
	Get(ctx context.Context, id string) (*connector.Instance, error)
	Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, cfg connector.Config, p connector.ProvisionType) (*connector.Instance, error)
}

ConnectorService can fetch and create a connector instance.

type DLQDestination

type DLQDestination struct {
	Destination stream.Destination
	Logger      log.CtxLogger
	// contains filtered or unexported fields
}

DLQDestination is a DLQ handler that forwards DLQ records to a destination connector.

func (*DLQDestination) Close

func (d *DLQDestination) Close(ctx context.Context) (err error)

Close stops the destination and tears it down.

func (*DLQDestination) Open

func (d *DLQDestination) Open(ctx context.Context) error

func (*DLQDestination) Write

func (d *DLQDestination) Write(ctx context.Context, rec opencdc.Record) error

Write writes the record synchronously to the destination, meaning that it waits until an ack is received for the record before it returns. If the record write fails or the destination returns a nack, the function returns an error.

type ErrRecoveryCfg

type ErrRecoveryCfg struct {
	MinDelay         time.Duration
	MaxDelay         time.Duration
	BackoffFactor    int
	MaxRetries       int64
	MaxRetriesWindow time.Duration
}

type FailureEvent

type FailureEvent struct {
	// ID is the ID of the pipeline which failed.
	ID    string
	Error error
}

type FailureHandler

type FailureHandler func(FailureEvent)

type PipelineService

type PipelineService interface {
	Get(ctx context.Context, pipelineID string) (*pipeline.Instance, error)
	List(ctx context.Context) map[string]*pipeline.Instance
	UpdateStatus(ctx context.Context, pipelineID string, status pipeline.Status, errMsg string) error
}

PipelineService can fetch, list and update the status of a pipeline instance.

type ProcessorService

type ProcessorService interface {
	Get(ctx context.Context, id string) (*processor.Instance, error)
	MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error)
}

ProcessorService can fetch a processor instance and make a runnable processor from it.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages pipelines.

func NewService

func NewService(
	logger log.CtxLogger,
	errRecoveryCfg *ErrRecoveryCfg,
	connectors ConnectorService,
	processors ProcessorService,
	connectorPlugins ConnectorPluginService,
	pipelines PipelineService,
) *Service

NewService initializes and returns a lifecycle.Service.

func (*Service) Init

func (s *Service) Init(
	ctx context.Context,
) error

Init starts all pipelines that have the StatusSystemStopped.

func (*Service) OnFailure

func (s *Service) OnFailure(handler FailureHandler)

OnFailure registers a handler for a lifecycle.FailureEvent. Only errors which happen after a pipeline has been started are being sent.

func (*Service) Start

func (s *Service) Start(
	ctx context.Context,
	pipelineID string,
) error

Start builds and starts a pipeline with the given ID. If the pipeline is already running, Start returns ErrPipelineRunning.

func (*Service) StartWithBackoff

func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) error

StartWithBackoff starts a pipeline with a backoff. It'll check the number of times the pipeline has been restarted and the duration of the backoff. When the pipeline has reached out the maximum number of retries, it'll return a fatal error.

func (*Service) Stop

func (s *Service) Stop(ctx context.Context, pipelineID string, force bool) error

Stop will attempt to gracefully stop a given pipeline by calling each node's Stop function. If force is set to true the pipeline won't stop gracefully, instead the context for all nodes will be canceled which causes them to stop running as soon as possible.

func (*Service) StopAll

func (s *Service) StopAll(ctx context.Context, reason error)

StopAll will ask all the running pipelines to stop gracefully (i.e. that existing messages get processed but not new messages get produced).

func (*Service) Wait

func (s *Service) Wait(timeout time.Duration) error

Wait blocks until all pipelines are stopped or until the timeout is reached. Returns:

(1) nil if all the pipelines are gracefully stopped,

(2) an error, if the pipelines could not have been gracefully stopped,

(3) context.DeadlineExceeded if the pipelines were not stopped within the given timeout.

func (*Service) WaitPipeline

func (s *Service) WaitPipeline(id string) error

WaitPipeline blocks until the pipeline with the given ID is stopped.

Directories

Path Synopsis
Package stream defines a message and nodes that can be composed into a data pipeline.
Package stream defines a message and nodes that can be composed into a data pipeline.
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL