Documentation ¶
Index ¶
- Constants
- Variables
- type Builder
- type Config
- type Connector
- type DefaultBuilder
- type Destination
- type DestinationState
- type PersistCallback
- type Persister
- type ProvisionType
- type Service
- func (s *Service) AddProcessor(ctx context.Context, connectorID string, processorID string) (Connector, error)
- func (s *Service) Create(ctx context.Context, id string, t Type, cfg Config, p ProvisionType) (Connector, error)
- func (s *Service) Delete(ctx context.Context, id string) error
- func (s *Service) Get(ctx context.Context, id string) (Connector, error)
- func (s *Service) Init(ctx context.Context) error
- func (s *Service) List(ctx context.Context) map[string]Connector
- func (s *Service) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (Connector, error)
- func (s *Service) Update(ctx context.Context, id string, data Config) (Connector, error)
- type Source
- type SourceState
- type Store
- type Type
Constants ¶
const ( DefaultPersisterDelayThreshold = time.Second DefaultPersisterBundleCountThreshold = 100 )
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Builder ¶
type Builder interface { Build(t Type, p ProvisionType) (Connector, error) // Init initializes a connector and validates it to make sure it's ready for use. Init(c Connector, id string, config Config) error }
Builder represents an object that can build a connector. The main use of this interface is to be able to switch out the connector implementations for mocks in tests.
type Config ¶
type Config struct { Name string Settings map[string]string Plugin string PipelineID string ProcessorIDs []string }
Config collects common data stored for a connector.
type Connector ¶
type Connector interface { ID() string Type() Type Config() Config SetConfig(Config) ProvisionedBy() ProvisionType CreatedAt() time.Time UpdatedAt() time.Time SetUpdatedAt(time.Time) // IsRunning returns true if the connector is running and ready to accept // calls to Read or Write (depending on the connector type). IsRunning() bool // Validate checks if the connector is set up correctly. Validate(ctx context.Context, settings map[string]string) error // Errors returns a channel that is used to signal the node that the // connector experienced an error when it was processing something // asynchronously (e.g. persisting state). Errors() <-chan error // Open will start the plugin process and call the Open method on the // plugin. After the connector has been successfully opened it is considered // as running (IsRunning returns true) and can be stopped again with // Teardown. Open will return an error if called on a running connector. Open(context.Context) error // Teardown will call the Teardown method on the plugin and stop the plugin // process. After the connector has been successfully torn down it is // considered as stopped (IsRunning returns false) and can be opened again // with Open. Teardown will return an error if called on a stopped // connector. Teardown(context.Context) error }
type DefaultBuilder ¶
type DefaultBuilder struct {
// contains filtered or unexported fields
}
DefaultBuilder is a Builder that builds regular destinations and sources connected to actual plugins.
func NewDefaultBuilder ¶
func (*DefaultBuilder) Build ¶
func (b *DefaultBuilder) Build(t Type, p ProvisionType) (Connector, error)
type Destination ¶
type Destination interface { Connector State() DestinationState SetState(state DestinationState) // Write sends a record to the connector and returns nil if the record was // successfully received. This does not necessarily mean that the record was // successfully processed and written to the 3rd party system, it might have // been cached and will be written at a later point in time. Acknowledgments // can be received through Ack to figure out if a record was actually // processed or if an error happened while processing it. Write(context.Context, record.Record) error // Ack blocks until an acknowledgment is received that a record was // processed and returns the position of that record. If the record wasn't // successfully processed the function returns the position and an error. Ack(context.Context) (record.Position, error) // Stop signals to the destination that no more records will be produced // after record with the last position. Stop(context.Context, record.Position) error }
Destination is a connector that can write records to a destination.
type DestinationState ¶
type PersistCallback ¶
type PersistCallback func(error)
PersistCallback is a function that's called when a connector is persisted.
type Persister ¶
type Persister struct {
// contains filtered or unexported fields
}
Persister is responsible for persisting connectors and their state when certain thresholds are met.
func NewPersister ¶
func NewPersister( logger log.CtxLogger, db database.DB, delayThreshold time.Duration, bundleCountThreshold int, ) *Persister
NewPersister creates a new persister that stores data into the supplied database when the thresholds are met.
func (*Persister) ConnectorStarted ¶
func (p *Persister) ConnectorStarted()
ConnectorStarted increases the number of connector this persister is persisting. As long as at least one connector is started the Wait function will block, so connectors have to make sure to call ConnectorStopped.
func (*Persister) ConnectorStopped ¶
func (p *Persister) ConnectorStopped()
ConnectorStopped triggers one last flush and decreases the number of connectors this persister is persisting. Once all connectors are stopped the Wait function stops blocking.
func (*Persister) Flush ¶
Flush will trigger a goroutine that persists any in-memory data to the store. To wait for the changes to be actually persisted you need to call Wait.
func (*Persister) Persist ¶
func (p *Persister) Persist(ctx context.Context, conn Connector, callback PersistCallback)
Persist signals the persister that a connector state changed and it should be persisted with the next batch. This function will collect all changed connectors until either the number of detected changes reaches the configured threshold or the configured delay is reached (whichever comes first), then the connectors are flushed and a new batch starts to be collected.
type ProvisionType ¶ added in v0.3.0
type ProvisionType int
ProvisionType defines provisioning type
const ( ProvisionTypeAPI ProvisionType = iota ProvisionTypeConfig )
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages connectors.
func NewService ¶
NewService creates a Store-backed implementation of Service.
func (*Service) AddProcessor ¶
func (s *Service) AddProcessor(ctx context.Context, connectorID string, processorID string) (Connector, error)
AddProcessor adds a processor to a connector.
func (*Service) Create ¶
func (s *Service) Create(ctx context.Context, id string, t Type, cfg Config, p ProvisionType) (Connector, error)
Create will create a connector instance, persist it and return it.
func (*Service) List ¶
List returns a map of Instances keyed by their ID. Instances do not necessarily have a running plugin associated with them.
type Source ¶
type Source interface { Connector State() SourceState SetState(state SourceState) // Read reads data from a data source and returns the record for the // requested position. Read(context.Context) (record.Record, error) // Ack signals to the source that the message has been successfully // processed and can be acknowledged. Ack(context.Context, record.Position) error // Stop signals to the source to stop producing records. After this call // Read will produce records until the record with the last position has // been read (Conduit might have already received that record). Stop(context.Context) (record.Position, error) }
Source is a connector that can read records from a source.
type SourceState ¶
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store handles the persistence and fetching of connectors.
func (*Store) Delete ¶
Delete deletes connector under the key id and returns nil on success, error otherwise.