Documentation ¶
Index ¶
- Constants
- Variables
- func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig
- func Parse(data []byte) (map[string]PipelineConfig, error)
- func ValidatePipelinesConfig(cfg PipelineConfig) error
- type ConnectorConfig
- type ConnectorService
- type PipelineConfig
- type PipelineService
- type PipelinesConfig
- type ProcessorConfig
- type ProcessorService
- type Service
Constants ¶
View Source
const ( StatusRunning = "running" StatusStopped = "stopped" TypeSource = "source" TypeDestination = "destination" )
View Source
const ParserVersion = "1.0"
Variables ¶
Functions ¶
func EnrichPipelinesConfig ¶
func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig
EnrichPipelinesConfig sets default values for pipeline config fields
func ValidatePipelinesConfig ¶
func ValidatePipelinesConfig(cfg PipelineConfig) error
ValidatePipelinesConfig validates config field values for a pipeline
Types ¶
type ConnectorConfig ¶
type ConnectorService ¶
type ConnectorService interface { Get(ctx context.Context, id string) (connector.Connector, error) Create(ctx context.Context, id string, t connector.Type, c connector.Config, p connector.ProvisionType) (connector.Connector, error) Delete(ctx context.Context, id string) error AddProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error) SetDestinationState(ctx context.Context, id string, state connector.DestinationState) (connector.Destination, error) SetSourceState(ctx context.Context, id string, state connector.SourceState) (connector.Source, error) }
type PipelineConfig ¶
type PipelineConfig struct { Status string `yaml:"status"` Name string `yaml:"name"` Description string `yaml:"description"` Connectors map[string]ConnectorConfig `yaml:"connectors,omitempty"` Processors map[string]ProcessorConfig `yaml:"processors,omitempty"` }
type PipelineService ¶
type PipelineService interface { Start(ctx context.Context, connFetcher pipeline.ConnectorFetcher, procFetcher pipeline.ProcessorFetcher, pipeline *pipeline.Instance) error Get(ctx context.Context, id string) (*pipeline.Instance, error) List(ctx context.Context) map[string]*pipeline.Instance Create(ctx context.Context, id string, cfg pipeline.Config, p pipeline.ProvisionType) (*pipeline.Instance, error) Delete(ctx context.Context, pl *pipeline.Instance) error Stop(ctx context.Context, pipeline *pipeline.Instance) error AddConnector(ctx context.Context, pl *pipeline.Instance, connectorID string) (*pipeline.Instance, error) RemoveConnector(ctx context.Context, pl *pipeline.Instance, connectorID string) (*pipeline.Instance, error) AddProcessor(ctx context.Context, pl *pipeline.Instance, processorID string) (*pipeline.Instance, error) RemoveProcessor(ctx context.Context, pl *pipeline.Instance, processorID string) (*pipeline.Instance, error) }
type PipelinesConfig ¶
type PipelinesConfig struct { Version string `yaml:"version"` Pipelines map[string]PipelineConfig `yaml:"pipelines"` }
type ProcessorConfig ¶
type ProcessorService ¶
type ProcessorService interface { Get(ctx context.Context, id string) (*processor.Instance, error) Create(ctx context.Context, id string, name string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType) (*processor.Instance, error) Delete(ctx context.Context, id string) error }
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func NewService( db database.DB, logger log.CtxLogger, plService PipelineService, connService ConnectorService, procService ProcessorService, pipelinesDir string, ) *Service
Source Files ¶
Click to show internal directories.
Click to hide internal directories.