Documentation ¶
Overview ¶
Package lifecycle contains the logic to manage the lifecycle of pipelines. It is responsible for starting, stopping and managing pipelines.
Index ¶
- type ConnectorPluginService
- type ConnectorService
- type FailureEvent
- type FailureHandler
- type PipelineService
- type ProcessorService
- type Service
- func (s *Service) Init(ctx context.Context) error
- func (s *Service) OnFailure(handler FailureHandler)
- func (s *Service) Start(ctx context.Context, pipelineID string) error
- func (s *Service) Stop(ctx context.Context, pipelineID string, force bool) error
- func (s *Service) StopAll(ctx context.Context, force bool) error
- func (s *Service) Wait(timeout time.Duration) error
- func (s *Service) WaitPipeline(id string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConnectorPluginService ¶
type ConnectorPluginService interface {
NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error)
}
ConnectorPluginService can create a connector plugin dispenser.
type ConnectorService ¶
type ConnectorService interface { Get(ctx context.Context, id string) (*connector.Instance, error) Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, cfg connector.Config, p connector.ProvisionType) (*connector.Instance, error) }
ConnectorService can fetch and create a connector instance.
type FailureEvent ¶
type FailureHandler ¶
type FailureHandler func(FailureEvent)
type PipelineService ¶
type PipelineService interface { Get(ctx context.Context, pipelineID string) (*pipeline.Instance, error) List(ctx context.Context) map[string]*pipeline.Instance UpdateStatus(ctx context.Context, pipelineID string, status pipeline.Status, errMsg string) error }
PipelineService can fetch, list and update the status of a pipeline instance.
type ProcessorService ¶
type ProcessorService interface { Get(ctx context.Context, id string) (*processor.Instance, error) MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error) }
ProcessorService can fetch a processor instance and make a runnable processor from it.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages pipelines.
func NewService ¶
func NewService( logger log.CtxLogger, connectors ConnectorService, processors ProcessorService, connectorPlugins ConnectorPluginService, pipelines PipelineService, ) *Service
NewService initializes and returns a lifecycle.Service.
func (*Service) OnFailure ¶
func (s *Service) OnFailure(handler FailureHandler)
OnFailure registers a handler for a lifecycle.FailureEvent. Only errors which happen after a pipeline has been started are being sent.
func (*Service) Start ¶
Start builds and starts a pipeline with the given ID. If the pipeline is already running, Start returns ErrPipelineRunning.
func (*Service) Stop ¶
Stop will attempt to gracefully stop a given pipeline by calling each worker's Stop method. If the force flag is set to true, the pipeline will be stopped forcefully by cancelling the context.
func (*Service) StopAll ¶
StopAll will ask all the running pipelines to stop gracefully (i.e. that existing messages get processed but not new messages get produced).
func (*Service) Wait ¶
Wait blocks until all pipelines are stopped or until the timeout is reached. Returns:
(1) nil if all the pipelines are gracefully stopped,
(2) an error, if the pipelines could not have been gracefully stopped,
(3) context.DeadlineExceeded if the pipelines were not stopped within the given timeout.
func (*Service) WaitPipeline ¶
WaitPipeline blocks until the pipeline with the given ID is stopped.