Documentation ¶
Index ¶
- Constants
- Variables
- func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig
- func Parse(data []byte) (map[string]PipelineConfig, error)
- func ValidatePipelinesConfig(cfg PipelineConfig) error
- type ConnectorConfig
- type ConnectorService
- type PipelineConfig
- type PipelineService
- type PipelinesConfig
- type ProcessorConfig
- type ProcessorService
- type Service
Constants ¶
View Source
const ( StatusRunning = "running" StatusStopped = "stopped" TypeSource = "source" TypeDestination = "destination" )
View Source
const ParserVersion = "1.0"
Variables ¶
Functions ¶
func EnrichPipelinesConfig ¶
func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig
EnrichPipelinesConfig sets default values for pipeline config fields
func ValidatePipelinesConfig ¶
func ValidatePipelinesConfig(cfg PipelineConfig) error
ValidatePipelinesConfig validates config field values for a pipeline
Types ¶
type ConnectorConfig ¶
type ConnectorService ¶
type ConnectorService interface { Get(ctx context.Context, id string) (connector.Connector, error) Create(ctx context.Context, id string, t connector.Type, c connector.Config, p connector.ProvisionType) (connector.Connector, error) Delete(ctx context.Context, id string) error AddProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error) SetDestinationState(ctx context.Context, id string, state connector.DestinationState) (connector.Destination, error) SetSourceState(ctx context.Context, id string, state connector.SourceState) (connector.Source, error) }
type PipelineConfig ¶
type PipelineConfig struct { Status string `yaml:"status"` Name string `yaml:"name"` Description string `yaml:"description"` Connectors map[string]ConnectorConfig `yaml:"connectors,omitempty"` Processors map[string]ProcessorConfig `yaml:"processors,omitempty"` }
type PipelineService ¶
type PipelineService interface { Start(ctx context.Context, connFetcher pipeline.ConnectorFetcher, procFetcher pipeline.ProcessorFetcher, pipelineID string) error Get(ctx context.Context, id string) (*pipeline.Instance, error) List(ctx context.Context) map[string]*pipeline.Instance Create(ctx context.Context, id string, cfg pipeline.Config, p pipeline.ProvisionType) (*pipeline.Instance, error) Delete(ctx context.Context, pipelineID string) error Stop(ctx context.Context, pipelineID string) error AddConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error) }
type PipelinesConfig ¶
type PipelinesConfig struct { Version string `yaml:"version"` Pipelines map[string]PipelineConfig `yaml:"pipelines"` }
type ProcessorConfig ¶
type ProcessorService ¶
type ProcessorService interface { Get(ctx context.Context, id string) (*processor.Instance, error) Create(ctx context.Context, id string, name string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType) (*processor.Instance, error) Delete(ctx context.Context, id string) error }
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func NewService( db database.DB, logger log.CtxLogger, plService PipelineService, connService ConnectorService, procService ProcessorService, pipelinesDir string, ) *Service
Source Files ¶
Click to show internal directories.
Click to hide internal directories.