Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type DLQ
- type Instance
- type ProvisionType
- type Service
- func (s *Service) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
- func (s *Service) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
- func (s *Service) Check(ctx context.Context) error
- func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error)
- func (s *Service) Delete(ctx context.Context, pipelineID string) error
- func (s *Service) Get(_ context.Context, id string) (*Instance, error)
- func (s *Service) Init(ctx context.Context) error
- func (s *Service) List(context.Context) map[string]*Instance
- func (s *Service) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
- func (s *Service) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
- func (s *Service) Update(ctx context.Context, pipelineID string, cfg Config) (*Instance, error)
- func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*Instance, error)
- func (s *Service) UpdateStatus(ctx context.Context, id string, status Status, errMsg string) error
- type Status
- type Store
Constants ¶
const ( IDLengthLimit = 128 NameLengthLimit = 128 DescriptionLengthLimit = 8192 )
Variables ¶
var ( ErrTimeout = cerrors.New("operation timed out") ErrGracefulShutdown = cerrors.New("graceful shutdown") ErrForceStop = cerrors.New("force stop") ErrPipelineRunning = cerrors.New("pipeline is running") ErrPipelineNotRunning = cerrors.New("pipeline not running") ErrInstanceNotFound = cerrors.New("pipeline instance not found") ErrNameMissing = cerrors.New("must provide a pipeline name") ErrIDMissing = cerrors.New("must provide a pipeline ID") ErrNameAlreadyExists = cerrors.New("pipeline name already exists") ErrInvalidCharacters = cerrors.New("pipeline ID contains invalid characters") ErrNameOverLimit = cerrors.New("pipeline name is over the character limit (64)") ErrIDOverLimit = cerrors.New("pipeline ID is over the character limit (64)") ErrDescriptionOverLimit = cerrors.New("pipeline description is over the character limit (8192)") ErrConnectorIDNotFound = cerrors.New("connector ID not found") ErrProcessorIDNotFound = cerrors.New("processor ID not found") )
var DefaultDLQ = DLQ{ Plugin: "builtin:log", Settings: map[string]string{ "level": "warn", "message": "record delivery failed", }, WindowSize: 1, WindowNackThreshold: 0, }
Functions ¶
This section is empty.
Types ¶
type Instance ¶
type Instance struct { ID string Config Config Error string CreatedAt time.Time UpdatedAt time.Time ProvisionedBy ProvisionType DLQ DLQ ConnectorIDs []string ProcessorIDs []string // contains filtered or unexported fields }
Instance manages a collection of Connectors, which can be either Destination or Source. The pipeline sets up its publishers and subscribers based on whether the Connector in question is a Destination or a Source.
type ProvisionType ¶ added in v0.3.0
type ProvisionType int
ProvisionType defines provisioning type
const ( ProvisionTypeAPI ProvisionType = iota ProvisionTypeConfig )
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages pipelines.
func NewService ¶
NewService initializes and returns a pipeline Service.
func (*Service) AddConnector ¶
func (s *Service) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
AddConnector adds a connector to a pipeline.
func (*Service) AddProcessor ¶
func (s *Service) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
AddProcessor adds a processor to a pipeline.
func (*Service) Create ¶
func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error)
Create will create a new pipeline instance with the given config and return if it was successfully saved to the database.
func (*Service) Init ¶
Init fetches instances from the store without running any. Connectors and processors should be initialized before calling this function.
func (*Service) RemoveConnector ¶
func (s *Service) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)
RemoveConnector removes a connector from a pipeline.
func (*Service) RemoveProcessor ¶
func (s *Service) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)
RemoveProcessor removes a processor from a pipeline.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store handles the persistence and fetching of pipeline instances.
func (*Store) Delete ¶
Delete deletes instance under the key id and returns nil on success, error otherwise.