pipeline

package
v0.13.0-nightly.20241022 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IDLengthLimit          = 128
	NameLengthLimit        = 128
	DescriptionLengthLimit = 8192
)

Variables

View Source
var (
	ErrTimeout               = cerrors.New("operation timed out")
	ErrGracefulShutdown      = cerrors.New("graceful shutdown")
	ErrForceStop             = cerrors.New("force stop")
	ErrPipelineCannotRecover = cerrors.New("pipeline couldn't be recovered")
	ErrPipelineRunning       = cerrors.New("pipeline is running")
	ErrPipelineNotRunning    = cerrors.New("pipeline not running")
	ErrInstanceNotFound      = cerrors.New("pipeline instance not found")
	ErrNameMissing           = cerrors.New("must provide a pipeline name")
	ErrIDMissing             = cerrors.New("must provide a pipeline ID")
	ErrNameAlreadyExists     = cerrors.New("pipeline name already exists")
	ErrInvalidCharacters     = cerrors.New("pipeline ID contains invalid characters")
	ErrNameOverLimit         = cerrors.New("pipeline name is over the character limit (64)")
	ErrIDOverLimit           = cerrors.New("pipeline ID is over the character limit (64)")
	ErrDescriptionOverLimit  = cerrors.New("pipeline description is over the character limit (8192)")
	ErrConnectorIDNotFound   = cerrors.New("connector ID not found")
	ErrProcessorIDNotFound   = cerrors.New("processor ID not found")
)
View Source
var DefaultDLQ = DLQ{
	Plugin: "builtin:log",
	Settings: map[string]string{
		"level":   "warn",
		"message": "record delivery failed",
	},
	WindowSize:          1,
	WindowNackThreshold: 0,
}

Functions

This section is empty.

Types

type Config

type Config struct {
	Name        string
	Description string
}

Config holds configuration data for building a pipeline.

type DLQ added in v0.4.0

type DLQ struct {
	Plugin   string
	Settings map[string]string

	WindowSize          int
	WindowNackThreshold int
}

type Instance

type Instance struct {
	ID            string
	Config        Config
	Error         string
	CreatedAt     time.Time
	UpdatedAt     time.Time
	ProvisionedBy ProvisionType
	DLQ           DLQ

	ConnectorIDs []string
	ProcessorIDs []string
	// contains filtered or unexported fields
}

Instance manages a collection of Connectors, which can be either Destination or Source. The pipeline sets up its publishers and subscribers based on whether the Connector in question is a Destination or a Source.

func (*Instance) GetStatus added in v0.12.0

func (p *Instance) GetStatus() Status

func (*Instance) SetStatus added in v0.12.0

func (p *Instance) SetStatus(s Status)

type ProvisionType added in v0.3.0

type ProvisionType int

ProvisionType defines provisioning type

const (
	ProvisionTypeAPI ProvisionType = iota
	ProvisionTypeConfig
)

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages pipelines.

func NewService

func NewService(logger log.CtxLogger, db database.DB) *Service

NewService initializes and returns a pipeline Service.

func (*Service) AddConnector

func (s *Service) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)

AddConnector adds a connector to a pipeline.

func (*Service) AddProcessor

func (s *Service) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)

AddProcessor adds a processor to a pipeline.

func (*Service) Check added in v0.5.1

func (s *Service) Check(ctx context.Context) error

func (*Service) Create

func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error)

Create will create a new pipeline instance with the given config and return if it was successfully saved to the database.

func (*Service) Delete

func (s *Service) Delete(ctx context.Context, pipelineID string) error

Delete removes a pipeline instance from the Service.

func (*Service) Get

func (s *Service) Get(_ context.Context, id string) (*Instance, error)

Get will return a single pipeline instance or an error.

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init fetches instances from the store without running any. Connectors and processors should be initialized before calling this function.

func (*Service) List

func (s *Service) List(context.Context) map[string]*Instance

List returns all pipeline instances in the Service.

func (*Service) RemoveConnector

func (s *Service) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*Instance, error)

RemoveConnector removes a connector from a pipeline.

func (*Service) RemoveProcessor

func (s *Service) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*Instance, error)

RemoveProcessor removes a processor from a pipeline.

func (*Service) Update

func (s *Service) Update(ctx context.Context, pipelineID string, cfg Config) (*Instance, error)

Update will update a pipeline instance config.

func (*Service) UpdateDLQ added in v0.4.0

func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*Instance, error)

UpdateDLQ will update a pipeline DLQ config.

func (*Service) UpdateStatus added in v0.12.0

func (s *Service) UpdateStatus(ctx context.Context, id string, status Status, errMsg string) error

UpdateStatus updates the status of a pipeline by the ID.

type Status

type Status int

Status defines the running status of a pipeline.

const (
	StatusRunning Status = iota + 1
	StatusSystemStopped
	StatusUserStopped
	StatusDegraded
	StatusRecovering
)

func (Status) String

func (i Status) String() string

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles the persistence and fetching of pipeline instances.

func NewStore

func NewStore(db database.DB) *Store

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete deletes instance under the key id and returns nil on success, error otherwise.

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (*Instance, error)

Get will return the pipeline instance for a given id or an error.

func (*Store) GetAll

func (s *Store) GetAll(ctx context.Context) (map[string]*Instance, error)

GetAll returns all instances stored in the database.

func (*Store) Set

func (s *Store) Set(ctx context.Context, id string, instance *Instance) error

Set stores instance under the key id and returns nil on success, error otherwise.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL