Documentation ¶
Index ¶
- Constants
- Variables
- func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig
- func ValidatePipelinesConfig(cfg PipelineConfig) error
- type ConnectorConfig
- type ConnectorService
- type DLQConfig
- type Parser
- type PipelineConfig
- type PipelineService
- type PipelinesConfig
- type ProcessorConfig
- type ProcessorService
- type Service
Constants ¶
View Source
const ( StatusRunning = "running" StatusStopped = "stopped" TypeSource = "source" TypeDestination = "destination" )
Variables ¶
Functions ¶
func EnrichPipelinesConfig ¶
func EnrichPipelinesConfig(mp map[string]PipelineConfig) map[string]PipelineConfig
EnrichPipelinesConfig sets default values for pipeline config fields
func ValidatePipelinesConfig ¶
func ValidatePipelinesConfig(cfg PipelineConfig) error
ValidatePipelinesConfig validates config field values for a pipeline
Types ¶
type ConnectorConfig ¶
type ConnectorService ¶
type ConnectorService interface { Get(ctx context.Context, id string) (connector.Connector, error) Create(ctx context.Context, id string, t connector.Type, c connector.Config, p connector.ProvisionType) (connector.Connector, error) Delete(ctx context.Context, id string) error AddProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (connector.Connector, error) SetDestinationState(ctx context.Context, id string, state connector.DestinationState) (connector.Destination, error) SetSourceState(ctx context.Context, id string, state connector.SourceState) (connector.Source, error) }
type PipelineConfig ¶
type PipelineConfig struct { Status string `yaml:"status"` Name string `yaml:"name"` Description string `yaml:"description"` Connectors map[string]ConnectorConfig `yaml:"connectors,omitempty"` Processors map[string]ProcessorConfig `yaml:"processors,omitempty"` DLQ DLQConfig `yaml:"dead-letter-queue"` }
type PipelineService ¶
type PipelineService interface { Start(ctx context.Context, connFetcher pipeline.ConnectorFetcher, procFetcher pipeline.ProcessorFetcher, pipelineID string) error Get(ctx context.Context, id string) (*pipeline.Instance, error) List(ctx context.Context) map[string]*pipeline.Instance Create(ctx context.Context, id string, cfg pipeline.Config, p pipeline.ProvisionType) (*pipeline.Instance, error) Delete(ctx context.Context, pipelineID string) error Stop(ctx context.Context, pipelineID string) error UpdateDLQ(ctx context.Context, pipelineID string, cfg pipeline.DLQ) (*pipeline.Instance, error) AddConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error) RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error) AddProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error) RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error) }
type PipelinesConfig ¶
type PipelinesConfig struct { Version string `yaml:"version"` Pipelines map[string]PipelineConfig `yaml:"pipelines"` }
type ProcessorConfig ¶
type ProcessorService ¶
type ProcessorService interface { Get(ctx context.Context, id string) (*processor.Instance, error) Create(ctx context.Context, id string, procType string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType) (*processor.Instance, error) Delete(ctx context.Context, id string) error }
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func NewService( db database.DB, logger log.CtxLogger, plService PipelineService, connService ConnectorService, procService ProcessorService, pipelinesDir string, ) *Service
Source Files ¶
Click to show internal directories.
Click to hide internal directories.