provisioning

package
v0.6.0-nightly.20230301 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusRunning   = "running"
	StatusStopped   = "stopped"
	TypeSource      = "source"
	TypeDestination = "destination"
)

Variables

View Source
var (
	ErrMandatoryField       = cerrors.New("mandatory field not specified")
	ErrInvalidField         = cerrors.New("invalid field value")
	ErrDuplicatedPipelineID = cerrors.New("duplicated pipeline ID")
)

Functions

func EnrichPipelinesConfig

func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig

EnrichPipelinesConfig sets default values for pipeline config fields

func ValidatePipelinesConfig

func ValidatePipelinesConfig(cfg PipelineConfig) error

ValidatePipelinesConfig validates config field values for a pipeline

Types

type ConnectorConfig

type ConnectorConfig struct {
	Type       string                     `yaml:"type"`
	Plugin     string                     `yaml:"plugin"`
	Name       string                     `yaml:"name"`
	Settings   map[string]string          `yaml:"settings"`
	Processors map[string]ProcessorConfig `yaml:"processors,omitempty"`
}

type ConnectorService

type ConnectorService interface {
	Get(ctx context.Context, id string) (*connector.Instance, error)
	Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, c connector.Config, p connector.ProvisionType) (*connector.Instance, error)
	Delete(ctx context.Context, id string) error

	AddProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
	RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)

	SetState(ctx context.Context, id string, state any) (*connector.Instance, error)
}

type DLQConfig added in v0.4.0

type DLQConfig struct {
	Plugin              string            `yaml:"plugin"`
	Settings            map[string]string `yaml:"settings"`
	WindowSize          *int              `yaml:"window-size"`
	WindowNackThreshold *int              `yaml:"window-nack-threshold"`
}

type Parser added in v0.5.0

type Parser struct {
	// contains filtered or unexported fields
}

func NewParser added in v0.5.0

func NewParser(logger log.CtxLogger) *Parser

func (*Parser) Parse added in v0.5.0

func (p *Parser) Parse(ctx context.Context, path string, data []byte) (map[string]PipelineConfig, error)

type PipelineConfig

type PipelineConfig struct {
	Status      string                     `yaml:"status"`
	Name        string                     `yaml:"name"`
	Description string                     `yaml:"description"`
	Connectors  map[string]ConnectorConfig `yaml:"connectors,omitempty"`
	Processors  map[string]ProcessorConfig `yaml:"processors,omitempty"`
	DLQ         DLQConfig                  `yaml:"dead-letter-queue"`
}

type PipelineService

type PipelineService interface {
	Start(ctx context.Context, connFetcher pipeline.ConnectorFetcher, procFetcher pipeline.ProcessorFetcher, pluginFetcher pipeline.PluginDispenserFetcher, pipelineID string) error
	Get(ctx context.Context, id string) (*pipeline.Instance, error)
	List(ctx context.Context) map[string]*pipeline.Instance
	Create(ctx context.Context, id string, cfg pipeline.Config, p pipeline.ProvisionType) (*pipeline.Instance, error)
	Delete(ctx context.Context, pipelineID string) error
	Stop(ctx context.Context, pipelineID string, force bool) error
	UpdateDLQ(ctx context.Context, pipelineID string, cfg pipeline.DLQ) (*pipeline.Instance, error)

	AddConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
	RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
	AddProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
	RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
}

type PipelinesConfig

type PipelinesConfig struct {
	Version   string                    `yaml:"version"`
	Pipelines map[string]PipelineConfig `yaml:"pipelines"`
}

type PluginService added in v0.5.0

type PluginService interface {
	NewDispenser(ctx log.CtxLogger, name string) (plugin.Dispenser, error)
}

type ProcessorConfig

type ProcessorConfig struct {
	Type     string            `yaml:"type"`
	Settings map[string]string `yaml:"settings"`
}

type ProcessorService

type ProcessorService interface {
	Get(ctx context.Context, id string) (*processor.Instance, error)
	Create(ctx context.Context, id string, procType string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType) (*processor.Instance, error)
	Delete(ctx context.Context, id string) error
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(
	db database.DB,
	logger log.CtxLogger,
	plService PipelineService,
	connService ConnectorService,
	procService ProcessorService,
	pluginService PluginService,
	pipelinesDir string,
) *Service

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init provision pipelines defined in pipelinePath directory. should initialize pipeline service before calling this function, and all pipelines should be stopped.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL