provisioning

package
v0.4.0-nightly.20221126 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2022 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusRunning   = "running"
	StatusStopped   = "stopped"
	TypeSource      = "source"
	TypeDestination = "destination"
)
View Source
const ParserVersion = "1.0"

Variables

View Source
var (
	ErrUnsupportedVersion   = cerrors.New("unsupported parser version")
	ErrMandatoryField       = cerrors.New("mandatory field not specified")
	ErrInvalidField         = cerrors.New("invalid field value")
	ErrDuplicatedPipelineID = cerrors.New("duplicated pipeline ID")
)

Functions

func EnrichPipelinesConfig

func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig

EnrichPipelinesConfig sets default values for pipeline config fields

func Parse

func Parse(data []byte) (map[string]PipelineConfig, error)

func ValidatePipelinesConfig

func ValidatePipelinesConfig(cfg PipelineConfig) error

ValidatePipelinesConfig validates config field values for a pipeline

Types

type ConnectorConfig

type ConnectorConfig struct {
	Type       string                     `yaml:"type"`
	Plugin     string                     `yaml:"plugin"`
	Name       string                     `yaml:"name"`
	Settings   map[string]string          `yaml:"settings"`
	Processors map[string]ProcessorConfig `yaml:"processors,omitempty"`
}

type ConnectorService

type ConnectorService interface {
	Get(ctx context.Context, id string) (connector.Connector, error)
	Create(ctx context.Context, id string, t connector.Type, c connector.Config, p connector.ProvisionType) (connector.Connector, error)
	Delete(ctx context.Context, id string) error

	AddProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error)
	RemoveProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error)

	SetDestinationState(ctx context.Context, id string, state connector.DestinationState) (connector.Destination, error)
	SetSourceState(ctx context.Context, id string, state connector.SourceState) (connector.Source, error)
}

type PipelineConfig

type PipelineConfig struct {
	Status      string                     `yaml:"status"`
	Name        string                     `yaml:"name"`
	Description string                     `yaml:"description"`
	Connectors  map[string]ConnectorConfig `yaml:"connectors,omitempty"`
	Processors  map[string]ProcessorConfig `yaml:"processors,omitempty"`
}

type PipelineService

type PipelineService interface {
	Start(ctx context.Context, connFetcher pipeline.ConnectorFetcher, procFetcher pipeline.ProcessorFetcher, pipelineID string) error
	Get(ctx context.Context, id string) (*pipeline.Instance, error)
	List(ctx context.Context) map[string]*pipeline.Instance
	Create(ctx context.Context, id string, cfg pipeline.Config, p pipeline.ProvisionType) (*pipeline.Instance, error)
	Delete(ctx context.Context, pipelineID string) error
	Stop(ctx context.Context, pipelineID string) error

	AddConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
	RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
	AddProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
	RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
}

type PipelinesConfig

type PipelinesConfig struct {
	Version   string                    `yaml:"version"`
	Pipelines map[string]PipelineConfig `yaml:"pipelines"`
}

type ProcessorConfig

type ProcessorConfig struct {
	Type     string            `yaml:"type"`
	Settings map[string]string `yaml:"settings"`
}

type ProcessorService

type ProcessorService interface {
	Get(ctx context.Context, id string) (*processor.Instance, error)
	Create(ctx context.Context, id string, procType string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType) (*processor.Instance, error)
	Delete(ctx context.Context, id string) error
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(
	db database.DB,
	logger log.CtxLogger,
	plService PipelineService,
	connService ConnectorService,
	procService ProcessorService,
	pipelinesDir string,
) *Service

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init provision pipelines defined in pipelinePath directory. should initialize pipeline service before calling this function, and all pipelines should be stopped.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL