Documentation ¶
Index ¶
- Variables
- type Config
- type ConnectorFetcher
- type DLQ
- type DLQDestination
- type Instance
- type PluginDispenserFetcher
- type ProcessorFetcher
- type ProvisionType
- type Service
- func (s *Service) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
- func (s *Service) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
- func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error)
- func (s *Service) Delete(ctx context.Context, pipelineID string) error
- func (s *Service) Get(ctx context.Context, id string) (*Instance, error)
- func (s *Service) Init(ctx context.Context) error
- func (s *Service) List(ctx context.Context) map[string]*Instance
- func (s *Service) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
- func (s *Service) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
- func (s *Service) Run(ctx context.Context, connFetcher ConnectorFetcher, ...) error
- func (s *Service) Start(ctx context.Context, connFetcher ConnectorFetcher, ...) error
- func (s *Service) Stop(ctx context.Context, pipelineID string) error
- func (s *Service) StopAll(ctx context.Context, reason error)
- func (s *Service) Update(ctx context.Context, pipelineID string, cfg Config) (*Instance, error)
- func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*Instance, error)
- func (s *Service) Wait(timeout time.Duration) error
- type Status
- type Store
Constants ¶
This section is empty.
Variables ¶
var ( ErrTimeout = cerrors.New("operation timed out") ErrGracefulShutdown = cerrors.New("graceful shutdown") ErrPipelineRunning = cerrors.New("pipeline is running") ErrPipelineNotRunning = cerrors.New("pipeline not running") ErrInstanceNotFound = cerrors.New("pipeline instance not found") ErrNameMissing = cerrors.New("must provide a pipeline name") ErrNameAlreadyExists = cerrors.New("pipeline name already exists") ErrConnectorIDNotFound = cerrors.New("connector ID not found") ErrProcessorIDNotFound = cerrors.New("processor ID not found") )
Functions ¶
This section is empty.
Types ¶
type ConnectorFetcher ¶
type ConnectorFetcher interface { Get(ctx context.Context, id string) (*connector.Instance, error) Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, cfg connector.Config, p connector.ProvisionType) (*connector.Instance, error) }
ConnectorFetcher can fetch a connector instance.
type DLQDestination ¶ added in v0.4.0
type DLQDestination struct { Destination stream.Destination Logger log.CtxLogger // contains filtered or unexported fields }
DLQDestination is a DLQ handler that forwards DLQ records to a destination connector.
func (*DLQDestination) Close ¶ added in v0.4.0
func (d *DLQDestination) Close(ctx context.Context) (err error)
Close stops the destination and tears it down.
func (*DLQDestination) Open ¶ added in v0.4.0
func (d *DLQDestination) Open(ctx context.Context) error
type Instance ¶
type Instance struct { ID string Config Config Status Status Error string CreatedAt time.Time UpdatedAt time.Time ProvisionedBy ProvisionType DLQ DLQ ConnectorIDs []string ProcessorIDs []string // contains filtered or unexported fields }
Instance manages a collection of Connectors, which can be either Destination or Source. The pipeline sets up its publishers and subscribers based on whether the Connector in question is a Destination or a Source.
type PluginDispenserFetcher ¶ added in v0.5.0
type PluginDispenserFetcher interface {
NewDispenser(logger log.CtxLogger, name string) (plugin.Dispenser, error)
}
PluginDispenserFetcher can fetch a plugin.
type ProcessorFetcher ¶
type ProcessorFetcher interface {
Get(ctx context.Context, id string) (*processor.Instance, error)
}
ProcessorFetcher can fetch a processor instance.
type ProvisionType ¶ added in v0.3.0
type ProvisionType int
ProvisionType defines provisioning type
const ( ProvisionTypeAPI ProvisionType = iota ProvisionTypeConfig )
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages pipelines.
func NewService ¶
NewService initializes and returns a pipeline Service.
func (*Service) AddConnector ¶
func (s *Service) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
AddConnector adds a connector to a pipeline.
func (*Service) AddProcessor ¶
func (s *Service) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
AddProcessor adds a processor to a pipeline.
func (*Service) Create ¶
func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error)
Create will create a new pipeline instance with the given config and return it if it was successfully saved to the database.
func (*Service) Init ¶
Init fetches instances from the store without running any. Connectors and processors should be initialized before calling this function.
func (*Service) RemoveConnector ¶
func (s *Service) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
RemoveConnector removes a connector from a pipeline.
func (*Service) RemoveProcessor ¶
func (s *Service) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
RemoveProcessor removes a processor from a pipeline.
func (*Service) Run ¶ added in v0.4.0
func (s *Service) Run( ctx context.Context, connFetcher ConnectorFetcher, procFetcher ProcessorFetcher, pluginFetcher PluginDispenserFetcher, ) error
Run runs pipelines that had the running state in store.
func (*Service) Start ¶
func (s *Service) Start( ctx context.Context, connFetcher ConnectorFetcher, procFetcher ProcessorFetcher, pluginFetcher PluginDispenserFetcher, pipelineID string, ) error
Start builds and starts a pipeline instance.
func (*Service) Stop ¶
Stop will attempt to gracefully stop a given pipeline by calling each node's Stop function.
func (*Service) StopAll ¶
StopAll will ask all the pipelines to stop gracefully (i.e. that existing messages get processed but not new messages get produced).
func (*Service) Wait ¶
Wait blocks until all pipelines are stopped or until the timeout is reached. Returns:
(1) nil if all the pipelines are gracefully stopped,
(2) an error, if the pipelines could not have been gracefully stopped,
(3) ErrTimeout if the pipelines were not stopped within the given timeout.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store handles the persistence and fetching of pipeline instances.
func (*Store) Delete ¶
Delete deletes instance under the key id and returns nil on success, error otherwise.