connector

package
v0.3.0-nightly.20220901 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultPersisterDelayThreshold       = time.Second
	DefaultPersisterBundleCountThreshold = 100
)

Variables

View Source
var (
	ErrInstanceNotFound     = cerrors.New("connector instance not found")
	ErrInvalidConnectorType = cerrors.New("invalid connector type")
	ErrProcessorIDNotFound  = cerrors.New("processor ID not found")
	ErrConnectorRunning     = cerrors.New("connector is running")
)

Functions

This section is empty.

Types

type Builder

type Builder interface {
	Build(t Type, p ProvisionType) (Connector, error)

	// Init initializes a connector and validates it to make sure it's ready for use.
	Init(c Connector, id string, config Config) error
}

Builder represents an object that can build a connector. The main use of this interface is to be able to switch out the connector implementations for mocks in tests.

type Config

type Config struct {
	Name       string
	Settings   map[string]string
	Plugin     string
	PipelineID string

	ProcessorIDs []string
}

Config collects common data stored for a connector.

type Connector

type Connector interface {
	ID() string
	Type() Type

	Config() Config
	SetConfig(Config)

	ProvisionedBy() ProvisionType

	CreatedAt() time.Time
	UpdatedAt() time.Time
	SetUpdatedAt(time.Time)

	// IsRunning returns true if the connector is running and ready to accept
	// calls to Read or Write (depending on the connector type).
	IsRunning() bool
	// Validate checks if the connector is set up correctly.
	Validate(ctx context.Context, settings map[string]string) error

	// Errors returns a channel that is used to signal the node that the
	// connector experienced an error when it was processing something
	// asynchronously (e.g. persisting state).
	Errors() <-chan error

	// Open will start the plugin process and call the Open method on the
	// plugin. After the connector has been successfully opened it is considered
	// as running (IsRunning returns true) and can be stopped again with
	// Teardown. Open will return an error if called on a running connector.
	Open(context.Context) error

	// Teardown will call the Teardown method on the plugin and stop the plugin
	// process. After the connector has been successfully torn down it is
	// considered as stopped (IsRunning returns false) and can be opened again
	// with Open. Teardown will return an error if called on a stopped
	// connector.
	Teardown(context.Context) error
}

type DefaultBuilder

type DefaultBuilder struct {
	// contains filtered or unexported fields
}

DefaultBuilder is a Builder that builds regular destinations and sources connected to actual plugins.

func NewDefaultBuilder

func NewDefaultBuilder(logger log.CtxLogger, persister *Persister, service *plugin.Service) *DefaultBuilder

func (*DefaultBuilder) Build

func (b *DefaultBuilder) Build(t Type, p ProvisionType) (Connector, error)

func (*DefaultBuilder) Init

func (b *DefaultBuilder) Init(c Connector, id string, config Config) error

type Destination

type Destination interface {
	Connector

	State() DestinationState
	SetState(state DestinationState)

	// Write sends a record to the connector and returns nil if the record was
	// successfully received. This does not necessarily mean that the record was
	// successfully processed and written to the 3rd party system, it might have
	// been cached and will be written at a later point in time. Acknowledgments
	// can be received through Ack to figure out if a record was actually
	// processed or if an error happened while processing it.
	Write(context.Context, record.Record) error
	// Ack blocks until an acknowledgment is received that a record was
	// processed and returns the position of that record. If the record wasn't
	// successfully processed the function returns the position and an error.
	Ack(context.Context) (record.Position, error)

	// Stop signals to the destination that no more records will be produced
	// after record with the last position.
	Stop(context.Context, record.Position) error
}

Destination is a connector that can write records to a destination.

type DestinationState

type DestinationState struct {
	Positions map[string]record.Position
}

type PersistCallback

type PersistCallback func(error)

PersistCallback is a function that's called when a connector is persisted.

type Persister

type Persister struct {
	// contains filtered or unexported fields
}

Persister is responsible for persisting connectors and their state when certain thresholds are met.

func NewPersister

func NewPersister(
	logger log.CtxLogger,
	db database.DB,
	delayThreshold time.Duration,
	bundleCountThreshold int,
) *Persister

NewPersister creates a new persister that stores data into the supplied database when the thresholds are met.

func (*Persister) ConnectorStarted

func (p *Persister) ConnectorStarted()

ConnectorStarted increases the number of connector this persister is persisting. As long as at least one connector is started the Wait function will block, so connectors have to make sure to call ConnectorStopped.

func (*Persister) ConnectorStopped

func (p *Persister) ConnectorStopped()

ConnectorStopped triggers one last flush and decreases the number of connectors this persister is persisting. Once all connectors are stopped the Wait function stops blocking.

func (*Persister) Flush

func (p *Persister) Flush(ctx context.Context)

Flush will trigger a goroutine that persists any in-memory data to the store. To wait for the changes to be actually persisted you need to call Wait.

func (*Persister) Persist

func (p *Persister) Persist(ctx context.Context, conn Connector, callback PersistCallback)

Persist signals the persister that a connector state changed and it should be persisted with the next batch. This function will collect all changed connectors until either the number of detected changes reaches the configured threshold or the configured delay is reached (whichever comes first), then the connectors are flushed and a new batch starts to be collected.

func (*Persister) Wait

func (p *Persister) Wait()

Wait waits for all connectors to stop running and for the last flush to be executed.

type ProvisionType added in v0.3.0

type ProvisionType int

ProvisionType defines provisioning type

const (
	ProvisionTypeAPI ProvisionType = iota
	ProvisionTypeConfig
)

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages connectors.

func NewService

func NewService(logger log.CtxLogger, db database.DB, connBuilder Builder) *Service

NewService creates a Store-backed implementation of Service.

func (*Service) AddProcessor

func (s *Service) AddProcessor(ctx context.Context, connectorID string, processorID string) (Connector, error)

AddProcessor adds a processor to a connector.

func (*Service) Create

func (s *Service) Create(ctx context.Context, id string, t Type, cfg Config, p ProvisionType) (Connector, error)

Create will create a connector instance, persist it and return it.

func (*Service) Delete

func (s *Service) Delete(ctx context.Context, id string) error

Delete removes.

func (*Service) Get

func (s *Service) Get(ctx context.Context, id string) (Connector, error)

Get retrieves a single connector instance by ID.

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init fetches connectors from the store.

func (*Service) List

func (s *Service) List(ctx context.Context) map[string]Connector

List returns a map of Instances keyed by their ID. Instances do not necessarily have a running plugin associated with them.

func (*Service) RemoveProcessor

func (s *Service) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (Connector, error)

RemoveProcessor removes a processor from a connector.

func (*Service) Update

func (s *Service) Update(ctx context.Context, id string, data Config) (Connector, error)

Update updates the connector config.

type Source

type Source interface {
	Connector

	State() SourceState
	SetState(state SourceState)

	// Read reads data from a data source and returns the record for the
	// requested position.
	Read(context.Context) (record.Record, error)

	// Ack signals to the source that the message has been successfully
	// processed and can be acknowledged.
	Ack(context.Context, record.Position) error

	// Stop signals to the source to stop producing records. After this call
	// Read will produce records until the record with the last position has
	// been read (Conduit might have already received that record).
	Stop(context.Context) (record.Position, error)
}

Source is a connector that can read records from a source.

type SourceState

type SourceState struct {
	Position record.Position
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles the persistence and fetching of connectors.

func NewStore

func NewStore(db database.DB, logger log.CtxLogger, builder Builder) *Store

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete deletes connector under the key id and returns nil on success, error otherwise.

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (Connector, error)

Get will return the connector for a given id or an error.

func (*Store) GetAll

func (s *Store) GetAll(ctx context.Context) (map[string]Connector, error)

GetAll returns all connectors stored in the database.

func (*Store) Set

func (s *Store) Set(ctx context.Context, id string, c Connector) error

Set stores connector under the key id and returns nil on success, error otherwise.

type Type

type Type int

Type defines the connector type.

const (
	TypeSource Type = iota + 1
	TypeDestination
)

func (Type) String

func (i Type) String() string

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL