Documentation ¶
Index ¶
- Variables
- type Config
- type ConnectorFetcher
- type Instance
- type ProcessorFetcher
- type Service
- func (s *Service) AddConnector(ctx context.Context, pl *Instance, connectorID string) (*Instance, error)
- func (s *Service) AddProcessor(ctx context.Context, pl *Instance, processorID string) (*Instance, error)
- func (s *Service) Create(ctx context.Context, id string, cfg Config) (*Instance, error)
- func (s *Service) Delete(ctx context.Context, pl *Instance) error
- func (s *Service) Get(ctx context.Context, id string) (*Instance, error)
- func (s *Service) Init(ctx context.Context, connFetcher ConnectorFetcher, ...) error
- func (s *Service) List(ctx context.Context) map[string]*Instance
- func (s *Service) RemoveConnector(ctx context.Context, pl *Instance, connectorID string) (*Instance, error)
- func (s *Service) RemoveProcessor(ctx context.Context, pl *Instance, processorID string) (*Instance, error)
- func (s *Service) Start(ctx context.Context, connFetcher ConnectorFetcher, ...) error
- func (s *Service) Stop(ctx context.Context, pl *Instance) error
- func (s *Service) StopAll(ctx context.Context, reason error)
- func (s *Service) Update(ctx context.Context, pl *Instance, cfg Config) (*Instance, error)
- func (s *Service) Wait(timeout time.Duration) error
- type Status
- type Store
Constants ¶
This section is empty.
Variables ¶
var ( ErrTimeout = cerrors.New("operation timed out") ErrGracefulShutdown = cerrors.New("graceful shutdown") ErrPipelineRunning = cerrors.New("pipeline is running") ErrPipelineNotRunning = cerrors.New("pipeline not running") ErrInstanceNotFound = cerrors.New("pipeline instance not found") ErrNameMissing = cerrors.New("must provide a pipeline name") ErrNameAlreadyExists = cerrors.New("pipeline name already exists") ErrConnectorIDNotFound = cerrors.New("connector ID not found") ErrProcessorIDNotFound = cerrors.New("processor ID not found") )
Functions ¶
This section is empty.
Types ¶
type ConnectorFetcher ¶
type ConnectorFetcher interface {
Get(ctx context.Context, id string) (connector.Connector, error)
}
ConnectorFetcher can fetch a connector instance.
type Instance ¶
type Instance struct { ID string Config Config Status Status Error string ConnectorIDs []string ProcessorIDs []string // contains filtered or unexported fields }
Instance manages a collection of Connectors, which can be either Destination or Source. The pipeline sets up its publishers and subscribers based on whether the Connector in question is a Destination or a Source.
type ProcessorFetcher ¶
type ProcessorFetcher interface {
Get(ctx context.Context, id string) (*processor.Instance, error)
}
ProcessorFetcher can fetch a processor instance.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages pipelines.
func NewService ¶
NewService initializes and returns a pipeline Service.
func (*Service) AddConnector ¶
func (s *Service) AddConnector(ctx context.Context, pl *Instance, connectorID string) (*Instance, error)
AddConnector adds a connector to a pipeline.
func (*Service) AddProcessor ¶
func (s *Service) AddProcessor(ctx context.Context, pl *Instance, processorID string) (*Instance, error)
AddProcessor adds a processor to a pipeline.
func (*Service) Create ¶
Create will create a new pipeline instance with the given config and return it if it was successfully saved to the database.
func (*Service) Init ¶
func (s *Service) Init( ctx context.Context, connFetcher ConnectorFetcher, procFetcher ProcessorFetcher, ) error
Init fetches instances from the store and starts pipelines that are supposed to be running. Connectors and processors should be initialized before calling this function.
func (*Service) RemoveConnector ¶
func (s *Service) RemoveConnector(ctx context.Context, pl *Instance, connectorID string) (*Instance, error)
RemoveConnector removes a connector from a pipeline.
func (*Service) RemoveProcessor ¶
func (s *Service) RemoveProcessor(ctx context.Context, pl *Instance, processorID string) (*Instance, error)
RemoveProcessor removes a processor from a pipeline.
func (*Service) Start ¶
func (s *Service) Start( ctx context.Context, connFetcher ConnectorFetcher, procFetcher ProcessorFetcher, pl *Instance, ) error
Start builds and starts a pipeline instance.
func (*Service) Stop ¶
Stop will attempt to gracefully stop a given pipeline by calling each node's Stop function.
func (*Service) StopAll ¶
StopAll will ask all the pipelines to stop gracefully (i.e. that existing messages get processed but not new messages get produced).
func (*Service) Wait ¶
Wait blocks until all pipelines are stopped or until the timeout is reached. Returns:
(1) nil if all the pipelines are gracefully stopped,
(2) an error, if the pipelines could not have been gracefully stopped,
(3) ErrTimeout if the pipelines were not stopped within the given timeout.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store handles the persistence and fetching of pipeline instances.
func (*Store) Delete ¶
Delete deletes instance under the key id and returns nil on success, error otherwise.