Documentation ¶
Index ¶
- Constants
- Variables
- func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig
- func ValidatePipelinesConfig(cfg PipelineConfig) error
- type ConnectorConfig
- type ConnectorService
- type DLQConfig
- type Parser
- type PipelineConfig
- type PipelineService
- type PipelinesConfig
- type PluginService
- type ProcessorConfig
- type ProcessorService
- type Service
Constants ¶
View Source
const ( StatusRunning = "running" StatusStopped = "stopped" TypeSource = "source" TypeDestination = "destination" )
Variables ¶
Functions ¶
func EnrichPipelinesConfig ¶
func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig
EnrichPipelinesConfig sets default values for pipeline config fields
func ValidatePipelinesConfig ¶
func ValidatePipelinesConfig(cfg PipelineConfig) error
ValidatePipelinesConfig validates config field values for a pipeline
Types ¶
type ConnectorConfig ¶
type ConnectorService ¶
type ConnectorService interface { Get(ctx context.Context, id string) (*connector.Instance, error) Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, c connector.Config, p connector.ProvisionType) (*connector.Instance, error) Delete(ctx context.Context, id string) error AddProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error) SetState(ctx context.Context, id string, state any) (*connector.Instance, error) }
type PipelineConfig ¶
type PipelineConfig struct { Status string `yaml:"status"` Name string `yaml:"name"` Description string `yaml:"description"` Connectors map[string]ConnectorConfig `yaml:"connectors,omitempty"` Processors map[string]ProcessorConfig `yaml:"processors,omitempty"` DLQ DLQConfig `yaml:"dead-letter-queue"` }
type PipelineService ¶
type PipelineService interface { Start(ctx context.Context, connFetcher pipeline.ConnectorFetcher, procFetcher pipeline.ProcessorFetcher, pluginFetcher pipeline.PluginDispenserFetcher, pipelineID string) error Get(ctx context.Context, id string) (*pipeline.Instance, error) List(ctx context.Context) map[string]*pipeline.Instance Create(ctx context.Context, id string, cfg pipeline.Config, p pipeline.ProvisionType) (*pipeline.Instance, error) Delete(ctx context.Context, pipelineID string) error Stop(ctx context.Context, pipelineID string, force bool) error UpdateDLQ(ctx context.Context, pipelineID string, cfg pipeline.DLQ) (*pipeline.Instance, error) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error) }
type PipelinesConfig ¶
type PipelinesConfig struct { Version string `yaml:"version"` Pipelines map[string]PipelineConfig `yaml:"pipelines"` }
type PluginService ¶ added in v0.5.0
type ProcessorConfig ¶
type ProcessorService ¶
type ProcessorService interface { Get(ctx context.Context, id string) (*processor.Instance, error) Create(ctx context.Context, id string, procType string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType) (*processor.Instance, error) Delete(ctx context.Context, id string) error }
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func NewService( db database.DB, logger log.CtxLogger, plService PipelineService, connService ConnectorService, procService ProcessorService, pluginService PluginService, pipelinesDir string, ) *Service
Source Files ¶
Click to show internal directories.
Click to hide internal directories.