Documentation ¶
Overview ¶
Package lifecycle contains the logic to manage the lifecycle of pipelines. It is responsible for starting, stopping and managing pipelines.
Index ¶
- Constants
- type ConnectorPluginService
- type ConnectorService
- type DLQDestination
- type ErrRecoveryCfg
- type FailureEvent
- type FailureHandler
- type PipelineService
- type ProcessorService
- type Service
- func (s *Service) Init(ctx context.Context) error
- func (s *Service) OnFailure(handler FailureHandler)
- func (s *Service) Start(ctx context.Context, pipelineID string) error
- func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) error
- func (s *Service) Stop(ctx context.Context, pipelineID string, force bool) error
- func (s *Service) StopAll(ctx context.Context, reason error)
- func (s *Service) Wait(timeout time.Duration) error
- func (s *Service) WaitPipeline(id string) error
Constants ¶
const InfiniteRetriesErrRecovery = -1
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConnectorPluginService ¶
type ConnectorPluginService interface {
NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error)
}
ConnectorPluginService can create a connector plugin dispenser.
type ConnectorService ¶
type ConnectorService interface { Get(ctx context.Context, id string) (*connector.Instance, error) Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, cfg connector.Config, p connector.ProvisionType) (*connector.Instance, error) }
ConnectorService can fetch and create a connector instance.
type DLQDestination ¶
type DLQDestination struct { Destination stream.Destination Logger log.CtxLogger // contains filtered or unexported fields }
DLQDestination is a DLQ handler that forwards DLQ records to a destination connector.
func (*DLQDestination) Close ¶
func (d *DLQDestination) Close(ctx context.Context) (err error)
Close stops the destination and tears it down.
type ErrRecoveryCfg ¶
type FailureEvent ¶
type FailureHandler ¶
type FailureHandler func(FailureEvent)
type PipelineService ¶
type PipelineService interface { Get(ctx context.Context, pipelineID string) (*pipeline.Instance, error) List(ctx context.Context) map[string]*pipeline.Instance UpdateStatus(ctx context.Context, pipelineID string, status pipeline.Status, errMsg string) error }
PipelineService can fetch, list and update the status of a pipeline instance.
type ProcessorService ¶
type ProcessorService interface { Get(ctx context.Context, id string) (*processor.Instance, error) MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error) }
ProcessorService can fetch a processor instance and make a runnable processor from it.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages pipelines.
func NewService ¶
func NewService( logger log.CtxLogger, errRecoveryCfg *ErrRecoveryCfg, connectors ConnectorService, processors ProcessorService, connectorPlugins ConnectorPluginService, pipelines PipelineService, ) *Service
NewService initializes and returns a lifecycle.Service.
func (*Service) OnFailure ¶
func (s *Service) OnFailure(handler FailureHandler)
OnFailure registers a handler for a lifecycle.FailureEvent. Only errors which happen after a pipeline has been started are being sent.
func (*Service) Start ¶
Start builds and starts a pipeline with the given ID. If the pipeline is already running, Start returns ErrPipelineRunning.
func (*Service) StartWithBackoff ¶
StartWithBackoff starts a pipeline with a backoff. It'll check the number of times the pipeline has been restarted and the duration of the backoff. When the pipeline has reached out the maximum number of retries, it'll return a fatal error.
func (*Service) Stop ¶
Stop will attempt to gracefully stop a given pipeline by calling each node's Stop function. If force is set to true the pipeline won't stop gracefully, instead the context for all nodes will be canceled which causes them to stop running as soon as possible.
func (*Service) StopAll ¶
StopAll will ask all the running pipelines to stop gracefully (i.e. that existing messages get processed but not new messages get produced).
func (*Service) Wait ¶
Wait blocks until all pipelines are stopped or until the timeout is reached. Returns:
(1) nil if all the pipelines are gracefully stopped,
(2) an error, if the pipelines could not have been gracefully stopped,
(3) context.DeadlineExceeded if the pipelines were not stopped within the given timeout.
func (*Service) WaitPipeline ¶
WaitPipeline blocks until the pipeline with the given ID is stopped.