Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type Connector
- type Destination
- func (d *Destination) Ack(ctx context.Context) (record.Position, error)
- func (d *Destination) Errors() <-chan error
- func (d *Destination) ID() string
- func (d *Destination) Open(ctx context.Context) error
- func (d *Destination) Stop(ctx context.Context, lastPosition record.Position) error
- func (d *Destination) Teardown(ctx context.Context) error
- func (d *Destination) Write(ctx context.Context, r record.Record) error
- type DestinationState
- type Instance
- type PersistCallback
- type Persister
- type PluginDispenserFetcher
- type ProvisionType
- type Service
- func (s *Service) AddProcessor(ctx context.Context, connectorID string, processorID string) (*Instance, error)
- func (s *Service) Check(ctx context.Context) error
- func (s *Service) Create(ctx context.Context, id string, t Type, plugin string, pipelineID string, ...) (*Instance, error)
- func (s *Service) Delete(ctx context.Context, id string) error
- func (s *Service) Get(ctx context.Context, id string) (*Instance, error)
- func (s *Service) Init(ctx context.Context) error
- func (s *Service) List(ctx context.Context) map[string]*Instance
- func (s *Service) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*Instance, error)
- func (s *Service) SetState(ctx context.Context, id string, state any) (*Instance, error)
- func (s *Service) Update(ctx context.Context, id string, data Config) (*Instance, error)
- type Source
- func (s *Source) Ack(ctx context.Context, p record.Position) error
- func (s *Source) Errors() <-chan error
- func (s *Source) ID() string
- func (s *Source) Open(ctx context.Context) error
- func (s *Source) Read(ctx context.Context) (record.Record, error)
- func (s *Source) Stop(ctx context.Context) (record.Position, error)
- func (s *Source) Teardown(ctx context.Context) error
- type SourceState
- type Store
- func (s *Store) Delete(ctx context.Context, id string) error
- func (s *Store) Get(ctx context.Context, id string) (*Instance, error)
- func (s *Store) GetAll(ctx context.Context) (map[string]*Instance, error)
- func (s *Store) PrepareSet(id string, instance *Instance) (func(context.Context) error, error)
- func (s *Store) Set(ctx context.Context, id string, c *Instance) error
- type Type
Constants ¶
const ( DefaultPersisterDelayThreshold = time.Second DefaultPersisterBundleCountThreshold = 100 )
Variables ¶
var ( ErrInstanceNotFound = cerrors.New("connector instance not found") ErrInvalidConnectorType = cerrors.New("invalid connector type") ErrInvalidConnectorStateType = cerrors.New("invalid connector state type") ErrProcessorIDNotFound = cerrors.New("processor ID not found") ErrConnectorRunning = cerrors.New("connector is running") )
Functions ¶
This section is empty.
Types ¶
type Destination ¶
type Destination struct { Instance *Instance // contains filtered or unexported fields }
func (*Destination) Errors ¶ added in v0.5.0
func (d *Destination) Errors() <-chan error
func (*Destination) ID ¶ added in v0.5.0
func (d *Destination) ID() string
type DestinationState ¶
type Instance ¶ added in v0.5.0
type Instance struct { ID string Type Type Config Config PipelineID string Plugin string ProcessorIDs []string State any ProvisionedBy ProvisionType CreatedAt time.Time UpdatedAt time.Time sync.RWMutex // contains filtered or unexported fields }
Instance represents a connector instance.
func (*Instance) Connector ¶ added in v0.5.0
func (i *Instance) Connector(ctx context.Context, dispenserFetcher PluginDispenserFetcher) (Connector, error)
Connector fetches a new plugin dispenser and returns a connector that can be used to interact with that plugin. If Instance.Type is TypeSource this method returns *Source, if it's TypeDestination it returns *Destination, otherwise it returns an error. The plugin is not started in this method, that happens when Open is called in the returned connector. If a connector is already running for this Instance this method returns an error.
type PersistCallback ¶
type PersistCallback func(error)
PersistCallback is a function that's called when a connector is persisted.
type Persister ¶
type Persister struct {
// contains filtered or unexported fields
}
Persister is responsible for persisting connectors and their state when certain thresholds are met.
func NewPersister ¶
func NewPersister( logger log.CtxLogger, db database.DB, delayThreshold time.Duration, bundleCountThreshold int, ) *Persister
NewPersister creates a new persister that stores data into the supplied database when the thresholds are met.
func (*Persister) ConnectorStarted ¶
func (p *Persister) ConnectorStarted()
ConnectorStarted increases the number of connector this persister is persisting. As long as at least one connector is started the Wait function will block, so connectors have to make sure to call ConnectorStopped.
func (*Persister) ConnectorStopped ¶
func (p *Persister) ConnectorStopped()
ConnectorStopped triggers one last flush and decreases the number of connectors this persister is persisting. Once all connectors are stopped the Wait function stops blocking.
func (*Persister) Flush ¶
Flush will trigger a goroutine that persists any in-memory data to the store. To wait for the changes to be actually persisted you need to call Wait.
func (*Persister) Persist ¶
Persist signals the persister that a connector state changed and it should be persisted with the next batch. This function will collect all changed connectors until either the number of detected changes reaches the configured threshold or the configured delay is reached (whichever comes first), then the connectors are flushed and a new batch starts to be collected.
type PluginDispenserFetcher ¶ added in v0.5.0
type PluginDispenserFetcher interface {
NewDispenser(logger log.CtxLogger, name string) (plugin.Dispenser, error)
}
PluginDispenserFetcher can fetch a plugin dispenser.
type ProvisionType ¶ added in v0.3.0
type ProvisionType int
ProvisionType defines provisioning type
const ( ProvisionTypeAPI ProvisionType = iota ProvisionTypeConfig ProvisionTypeDLQ // used for provisioning DLQ connectors which are not persisted )
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages connectors.
func NewService ¶
NewService creates a Store-backed implementation of Service.
func (*Service) AddProcessor ¶
func (s *Service) AddProcessor(ctx context.Context, connectorID string, processorID string) (*Instance, error)
AddProcessor adds a processor to a connector.
func (*Service) Create ¶
func (s *Service) Create( ctx context.Context, id string, t Type, plugin string, pipelineID string, cfg Config, p ProvisionType, ) (*Instance, error)
Create will create a connector instance, persist it and return it.
func (*Service) List ¶
List returns a map of Instances keyed by their ID. Instances do not necessarily have a running plugin associated with them.
func (*Service) RemoveProcessor ¶
func (s *Service) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*Instance, error)
RemoveProcessor removes a processor from a connector.
type Source ¶
type Source struct { Instance *Instance // contains filtered or unexported fields }
type SourceState ¶
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store handles the persistence and fetching of connectors.
func (*Store) Delete ¶
Delete deletes connector under the key id and returns nil on success, error otherwise.
func (*Store) PrepareSet ¶ added in v0.5.3
PrepareSet encodes the connector instance and returns a function that stores the connector. This can be used to prepare everything needed to store an instance without actually storing it yet.