connector

package
v0.8.0-nightly.20231104 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultPersisterDelayThreshold       = time.Second
	DefaultPersisterBundleCountThreshold = 10000
)

Variables

View Source
var (
	ErrInstanceNotFound          = cerrors.New("connector instance not found")
	ErrInvalidConnectorType      = cerrors.New("invalid connector type")
	ErrInvalidConnectorStateType = cerrors.New("invalid connector state type")
	ErrProcessorIDNotFound       = cerrors.New("processor ID not found")
	ErrConnectorRunning          = cerrors.New("connector is running")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Name     string
	Settings map[string]string
}

Config collects common data stored for a connector.

type Connector

type Connector interface {
	OnDelete(ctx context.Context) error
}

type Destination

type Destination struct {
	Instance *Instance
	// contains filtered or unexported fields
}

func (*Destination) Ack added in v0.2.0

func (*Destination) Errors added in v0.5.0

func (d *Destination) Errors() <-chan error

func (*Destination) ID added in v0.5.0

func (d *Destination) ID() string

func (*Destination) OnDelete added in v0.6.0

func (d *Destination) OnDelete(ctx context.Context) (err error)

func (*Destination) Open added in v0.5.0

func (d *Destination) Open(ctx context.Context) (err error)

func (*Destination) Stop added in v0.3.0

func (d *Destination) Stop(ctx context.Context, lastPosition record.Position) error

func (*Destination) Teardown added in v0.5.0

func (d *Destination) Teardown(ctx context.Context) error

func (*Destination) Write

func (d *Destination) Write(ctx context.Context, r record.Record) error

type DestinationState

type DestinationState struct {
	Positions map[string]record.Position
}

type Instance added in v0.5.0

type Instance struct {
	ID         string
	Type       Type
	Config     Config
	PipelineID string
	Plugin     string

	ProcessorIDs  []string
	State         any
	ProvisionedBy ProvisionType
	CreatedAt     time.Time
	UpdatedAt     time.Time
	// LastActiveConfig is the last config that was used to successfully start
	// the connector. If Config and LastActiveConfig are not the same, the
	// connector will trigger a connector lifecycle event.
	LastActiveConfig Config

	sync.RWMutex
	// contains filtered or unexported fields
}

Instance represents a connector instance.

func (*Instance) Close added in v0.6.0

func (i *Instance) Close(ctx context.Context, dispenserFetcher PluginDispenserFetcher) error

func (*Instance) Connector added in v0.5.0

func (i *Instance) Connector(_ context.Context, dispenserFetcher PluginDispenserFetcher) (Connector, error)

Connector fetches a new plugin dispenser and returns a connector that can be used to interact with that plugin. If Instance.Type is TypeSource this method returns *Source, if it's TypeDestination it returns *Destination, otherwise it returns an error. The plugin is not started in this method, that happens when Open is called in the returned connector. If a connector is already running for this Instance this method returns an error.

func (*Instance) Init added in v0.5.0

func (i *Instance) Init(logger log.CtxLogger, persister *Persister)

func (*Instance) Inspect added in v0.5.0

func (i *Instance) Inspect(ctx context.Context) *inspector.Session

Inspect returns an inspector.Session which exposes the records coming into or out of this connector (depending on the connector type).

type PersistCallback

type PersistCallback func(error)

PersistCallback is a function that's called when a connector is persisted.

type Persister

type Persister struct {
	// contains filtered or unexported fields
}

Persister is responsible for persisting connectors and their state when certain thresholds are met.

func NewPersister

func NewPersister(
	logger log.CtxLogger,
	db database.DB,
	delayThreshold time.Duration,
	bundleCountThreshold int,
) *Persister

NewPersister creates a new persister that stores data into the supplied database when the thresholds are met.

func (*Persister) ConnectorStarted

func (p *Persister) ConnectorStarted()

ConnectorStarted increases the number of connector this persister is persisting. As long as at least one connector is started the Wait function will block, so connectors have to make sure to call ConnectorStopped.

func (*Persister) ConnectorStopped

func (p *Persister) ConnectorStopped()

ConnectorStopped triggers one last flush and decreases the number of connectors this persister is persisting. Once all connectors are stopped the Wait function stops blocking.

func (*Persister) Flush

func (p *Persister) Flush(ctx context.Context)

Flush will trigger a goroutine that persists any in-memory data to the store. To wait for the changes to be actually persisted you need to call Wait.

func (*Persister) Persist

func (p *Persister) Persist(ctx context.Context, conn *Instance, callback PersistCallback) error

Persist signals the persister that a connector state changed and it should be persisted with the next batch. This function will collect all changed connectors until either the number of detected changes reaches the configured threshold or the configured delay is reached (whichever comes first), then the connectors are flushed and a new batch starts to be collected.

func (*Persister) Wait

func (p *Persister) Wait()

Wait waits for all connectors to stop running and for the last flush to be executed.

type PluginDispenserFetcher added in v0.5.0

type PluginDispenserFetcher interface {
	NewDispenser(logger log.CtxLogger, name string) (plugin.Dispenser, error)
}

PluginDispenserFetcher can fetch a plugin dispenser.

type ProvisionType added in v0.3.0

type ProvisionType int

ProvisionType defines provisioning type

const (
	ProvisionTypeAPI ProvisionType = iota
	ProvisionTypeConfig
	ProvisionTypeDLQ // used for provisioning DLQ connectors which are not persisted
)

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages connectors.

func NewService

func NewService(logger log.CtxLogger, db database.DB, persister *Persister) *Service

NewService creates a Store-backed implementation of Service.

func (*Service) AddProcessor

func (s *Service) AddProcessor(ctx context.Context, connectorID string, processorID string) (*Instance, error)

AddProcessor adds a processor to a connector.

func (*Service) Check added in v0.5.1

func (s *Service) Check(ctx context.Context) error

func (*Service) Create

func (s *Service) Create(
	ctx context.Context,
	id string,
	t Type,
	plugin string,
	pipelineID string,
	cfg Config,
	p ProvisionType,
) (*Instance, error)

Create will create a connector instance, persist it and return it.

func (*Service) Delete

func (s *Service) Delete(ctx context.Context, id string, dispenserFetcher PluginDispenserFetcher) error

Delete removes the connector.

func (*Service) Get

func (s *Service) Get(_ context.Context, id string) (*Instance, error)

Get retrieves a single connector instance by ID.

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init fetches connectors from the store.

func (*Service) List

func (s *Service) List(context.Context) map[string]*Instance

List returns a map of Instances keyed by their ID. Instances do not necessarily have a running plugin associated with them.

func (*Service) RemoveProcessor

func (s *Service) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*Instance, error)

RemoveProcessor removes a processor from a connector.

func (*Service) SetState added in v0.5.0

func (s *Service) SetState(ctx context.Context, id string, state any) (*Instance, error)

func (*Service) Update

func (s *Service) Update(ctx context.Context, id string, data Config) (*Instance, error)

Update updates the connector config.

type Source

type Source struct {
	Instance *Instance
	// contains filtered or unexported fields
}

func (*Source) Ack

func (s *Source) Ack(ctx context.Context, p record.Position) error

func (*Source) Errors added in v0.5.0

func (s *Source) Errors() <-chan error

func (*Source) ID added in v0.5.0

func (s *Source) ID() string

func (*Source) OnDelete added in v0.6.0

func (s *Source) OnDelete(ctx context.Context) (err error)

func (*Source) Open added in v0.5.0

func (s *Source) Open(ctx context.Context) (err error)

func (*Source) Read

func (s *Source) Read(ctx context.Context) (record.Record, error)

func (*Source) Stop added in v0.2.0

func (s *Source) Stop(ctx context.Context) (record.Position, error)

func (*Source) Teardown added in v0.5.0

func (s *Source) Teardown(ctx context.Context) error

type SourceState

type SourceState struct {
	Position record.Position
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles the persistence and fetching of connectors.

func NewStore

func NewStore(db database.DB, logger log.CtxLogger) *Store

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete deletes connector under the key id and returns nil on success, error otherwise.

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (*Instance, error)

Get will return the connector for a given id or an error.

func (*Store) GetAll

func (s *Store) GetAll(ctx context.Context) (map[string]*Instance, error)

GetAll returns all connectors stored in the database.

func (*Store) PrepareSet added in v0.5.3

func (s *Store) PrepareSet(id string, instance *Instance) (func(context.Context) error, error)

PrepareSet encodes the connector instance and returns a function that stores the connector. This can be used to prepare everything needed to store an instance without actually storing it yet.

func (*Store) Set

func (s *Store) Set(ctx context.Context, id string, c *Instance) error

Set stores connector under the key id and returns nil on success, error otherwise.

type Type

type Type int

Type defines the connector type.

const (
	TypeSource Type = iota + 1
	TypeDestination
)

func (Type) String

func (i Type) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL