provisioning

package
v0.5.0-nightly.20230113 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2023 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusRunning   = "running"
	StatusStopped   = "stopped"
	TypeSource      = "source"
	TypeDestination = "destination"
)

Variables

View Source
var (
	ErrMandatoryField       = cerrors.New("mandatory field not specified")
	ErrInvalidField         = cerrors.New("invalid field value")
	ErrDuplicatedPipelineID = cerrors.New("duplicated pipeline ID")
)

Functions

func EnrichPipelinesConfig

func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig

EnrichPipelinesConfig sets default values for pipeline config fields

func ValidatePipelinesConfig

func ValidatePipelinesConfig(cfg PipelineConfig) error

ValidatePipelinesConfig validates config field values for a pipeline

Types

type ConnectorConfig

type ConnectorConfig struct {
	Type       string                     `yaml:"type"`
	Plugin     string                     `yaml:"plugin"`
	Name       string                     `yaml:"name"`
	Settings   map[string]string          `yaml:"settings"`
	Processors map[string]ProcessorConfig `yaml:"processors,omitempty"`
}

type ConnectorService

type ConnectorService interface {
	Get(ctx context.Context, id string) (connector.Connector, error)
	Create(ctx context.Context, id string, t connector.Type, c connector.Config, p connector.ProvisionType) (connector.Connector, error)
	Delete(ctx context.Context, id string) error

	AddProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error)
	RemoveProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error)

	SetDestinationState(ctx context.Context, id string, state connector.DestinationState) (connector.Destination, error)
	SetSourceState(ctx context.Context, id string, state connector.SourceState) (connector.Source, error)
}

type DLQConfig added in v0.4.0

type DLQConfig struct {
	Plugin              string            `yaml:"plugin"`
	Settings            map[string]string `yaml:"settings"`
	WindowSize          *int              `yaml:"window-size"`
	WindowNackThreshold *int              `yaml:"window-nack-threshold"`
}

type Parser added in v0.5.0

type Parser struct {
	// contains filtered or unexported fields
}

func NewParser added in v0.5.0

func NewParser(logger log.CtxLogger) *Parser

func (*Parser) Parse added in v0.5.0

func (p *Parser) Parse(ctx context.Context, path string, data []byte) (map[string]PipelineConfig, error)

type PipelineConfig

type PipelineConfig struct {
	Status      string                     `yaml:"status"`
	Name        string                     `yaml:"name"`
	Description string                     `yaml:"description"`
	Connectors  map[string]ConnectorConfig `yaml:"connectors,omitempty"`
	Processors  map[string]ProcessorConfig `yaml:"processors,omitempty"`
	DLQ         DLQConfig                  `yaml:"dead-letter-queue"`
}

type PipelineService

type PipelineService interface {
	Start(ctx context.Context, connFetcher pipeline.ConnectorFetcher, procFetcher pipeline.ProcessorFetcher, pipelineID string) error
	Get(ctx context.Context, id string) (*pipeline.Instance, error)
	List(ctx context.Context) map[string]*pipeline.Instance
	Create(ctx context.Context, id string, cfg pipeline.Config, p pipeline.ProvisionType) (*pipeline.Instance, error)
	Delete(ctx context.Context, pipelineID string) error
	Stop(ctx context.Context, pipelineID string) error
	UpdateDLQ(ctx context.Context, pipelineID string, cfg pipeline.DLQ) (*pipeline.Instance, error)

	AddConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
	RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
	AddProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
	RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
}

type PipelinesConfig

type PipelinesConfig struct {
	Version   string                    `yaml:"version"`
	Pipelines map[string]PipelineConfig `yaml:"pipelines"`
}

type ProcessorConfig

type ProcessorConfig struct {
	Type     string            `yaml:"type"`
	Settings map[string]string `yaml:"settings"`
}

type ProcessorService

type ProcessorService interface {
	Get(ctx context.Context, id string) (*processor.Instance, error)
	Create(ctx context.Context, id string, procType string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType) (*processor.Instance, error)
	Delete(ctx context.Context, id string) error
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(
	db database.DB,
	logger log.CtxLogger,
	plService PipelineService,
	connService ConnectorService,
	procService ProcessorService,
	pipelinesDir string,
) *Service

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init provision pipelines defined in pipelinePath directory. should initialize pipeline service before calling this function, and all pipelines should be stopped.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL