Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type ConnectorFetcher
- type DLQ
- type DLQDestination
- type FailureEvent
- type FailureHandler
- type Instance
- type PluginDispenserFetcher
- type ProcessorService
- type ProvisionType
- type Service
- func (s *Service) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
- func (s *Service) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
- func (s *Service) Check(ctx context.Context) error
- func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error)
- func (s *Service) Delete(ctx context.Context, pipelineID string) error
- func (s *Service) Get(_ context.Context, id string) (*Instance, error)
- func (s *Service) Init(ctx context.Context) error
- func (s *Service) List(context.Context) map[string]*Instance
- func (s *Service) OnFailure(handler FailureHandler)
- func (s *Service) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
- func (s *Service) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
- func (s *Service) Run(ctx context.Context, connFetcher ConnectorFetcher, ...) error
- func (s *Service) Start(ctx context.Context, connFetcher ConnectorFetcher, ...) error
- func (s *Service) Stop(ctx context.Context, pipelineID string, force bool) error
- func (s *Service) StopAll(ctx context.Context, reason error)
- func (s *Service) Update(ctx context.Context, pipelineID string, cfg Config) (*Instance, error)
- func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*Instance, error)
- func (s *Service) Wait(timeout time.Duration) error
- type Status
- type Store
Constants ¶
const ( IDLengthLimit = 128 NameLengthLimit = 128 DescriptionLengthLimit = 8192 )
Variables ¶
var ( ErrTimeout = cerrors.New("operation timed out") ErrGracefulShutdown = cerrors.New("graceful shutdown") ErrPipelineRunning = cerrors.New("pipeline is running") ErrPipelineNotRunning = cerrors.New("pipeline not running") ErrInstanceNotFound = cerrors.New("pipeline instance not found") ErrNameMissing = cerrors.New("must provide a pipeline name") ErrIDMissing = cerrors.New("must provide a pipeline ID") ErrNameAlreadyExists = cerrors.New("pipeline name already exists") ErrInvalidCharacters = cerrors.New("pipeline ID contains invalid characters") ErrNameOverLimit = cerrors.New("pipeline name is over the character limit (64)") ErrIDOverLimit = cerrors.New("pipeline ID is over the character limit (64)") ErrDescriptionOverLimit = cerrors.New("pipeline description is over the character limit (8192)") ErrConnectorIDNotFound = cerrors.New("connector ID not found") ErrProcessorIDNotFound = cerrors.New("processor ID not found") )
var DefaultDLQ = DLQ{ Plugin: "builtin:log", Settings: map[string]string{ "level": "warn", "message": "record delivery failed", }, WindowSize: 1, WindowNackThreshold: 0, }
Functions ¶
This section is empty.
Types ¶
type ConnectorFetcher ¶
type ConnectorFetcher interface { Get(ctx context.Context, id string) (*connector.Instance, error) Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, cfg connector.Config, p connector.ProvisionType) (*connector.Instance, error) }
ConnectorFetcher can fetch a connector instance.
type DLQDestination ¶ added in v0.4.0
type DLQDestination struct { Destination stream.Destination Logger log.CtxLogger // contains filtered or unexported fields }
DLQDestination is a DLQ handler that forwards DLQ records to a destination connector.
func (*DLQDestination) Close ¶ added in v0.4.0
func (d *DLQDestination) Close(ctx context.Context) (err error)
Close stops the destination and tears it down.
func (*DLQDestination) Open ¶ added in v0.4.0
func (d *DLQDestination) Open(ctx context.Context) error
type FailureEvent ¶ added in v0.6.1
type FailureHandler ¶ added in v0.6.1
type FailureHandler func(FailureEvent)
type Instance ¶
type Instance struct { ID string Config Config Status Status Error string CreatedAt time.Time UpdatedAt time.Time ProvisionedBy ProvisionType DLQ DLQ ConnectorIDs []string ProcessorIDs []string // contains filtered or unexported fields }
Instance manages a collection of Connectors, which can be either Destination or Source. The pipeline sets up its publishers and subscribers based on whether the Connector in question is a Destination or a Source.
type PluginDispenserFetcher ¶ added in v0.5.0
type PluginDispenserFetcher interface {
NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error)
}
PluginDispenserFetcher can fetch a plugin.
type ProcessorService ¶ added in v0.9.0
type ProcessorService interface { Get(ctx context.Context, id string) (*processor.Instance, error) MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error) }
ProcessorService can fetch a processor instance and make a runnable processor from it.
type ProvisionType ¶ added in v0.3.0
type ProvisionType int
ProvisionType defines provisioning type
const ( ProvisionTypeAPI ProvisionType = iota ProvisionTypeConfig )
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages pipelines.
func NewService ¶
NewService initializes and returns a pipeline Service.
func (*Service) AddConnector ¶
func (s *Service) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
AddConnector adds a connector to a pipeline.
func (*Service) AddProcessor ¶
func (s *Service) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
AddProcessor adds a processor to a pipeline.
func (*Service) Create ¶
func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error)
Create will create a new pipeline instance with the given config and return it if it was successfully saved to the database.
func (*Service) Init ¶
Init fetches instances from the store without running any. Connectors and processors should be initialized before calling this function.
func (*Service) OnFailure ¶ added in v0.6.1
func (s *Service) OnFailure(handler FailureHandler)
OnFailure registers a handler for a pipeline.FailureEvent. Only errors which happen after a pipeline has been started are being sent.
func (*Service) RemoveConnector ¶
func (s *Service) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
RemoveConnector removes a connector from a pipeline.
func (*Service) RemoveProcessor ¶
func (s *Service) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
RemoveProcessor removes a processor from a pipeline.
func (*Service) Run ¶ added in v0.4.0
func (s *Service) Run( ctx context.Context, connFetcher ConnectorFetcher, procService ProcessorService, pluginFetcher PluginDispenserFetcher, ) error
Run runs pipelines that had the running state in store.
func (*Service) Start ¶
func (s *Service) Start( ctx context.Context, connFetcher ConnectorFetcher, procService ProcessorService, pluginFetcher PluginDispenserFetcher, pipelineID string, ) error
Start builds and starts a pipeline with the given ID. If the pipeline is already running, Start returns ErrPipelineRunning.
func (*Service) Stop ¶
Stop will attempt to gracefully stop a given pipeline by calling each node's Stop function. If force is set to true the pipeline won't stop gracefully, instead the context for all nodes will be canceled which causes them to stop running as soon as possible.
func (*Service) StopAll ¶
StopAll will ask all the pipelines to stop gracefully (i.e. that existing messages get processed but not new messages get produced).
func (*Service) Wait ¶
Wait blocks until all pipelines are stopped or until the timeout is reached. Returns:
(1) nil if all the pipelines are gracefully stopped,
(2) an error, if the pipelines could not have been gracefully stopped,
(3) ErrTimeout if the pipelines were not stopped within the given timeout.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store handles the persistence and fetching of pipeline instances.
func (*Store) Delete ¶
Delete deletes instance under the key id and returns nil on success, error otherwise.