plugin

package
v0.3.0-nightly.20220802 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2022 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const BuiltinPluginPrefix = "builtin:"

Variables

View Source
var (
	ErrStreamNotOpen    = cerrors.New("stream not open")
	ErrPluginRunning    = cerrors.New("plugin is running")
	ErrPluginNotRunning = cerrors.New("plugin is not running")
)

Functions

func AcceptanceTestV1

func AcceptanceTestV1(t *testing.T, tdf testDispenserFunc)

AcceptanceTestV1 is the acceptance test that all implementations of v1 plugins should pass. It should manually be called from a test case in each implementation:

func TestPlugin(t *testing.T) {
    testDispenser := func() {...}
    plugin.AcceptanceTestV1(t, testDispenser)
}

Types

type DestinationPlugin

type DestinationPlugin interface {
	// Configure provides the configuration to the plugin and sets it up, so it's
	// ready to start running. If the configuration is invalid the method will
	// return an error.
	Configure(context.Context, map[string]string) error

	// Start will trigger a background process in the plugin that will stream
	// records to the plugin and listen to acks. After Start returns Conduit is
	// allowed to call methods Write and Ack. The stream will keep running until
	// the context passed to Start is closed. If the context is closed no more
	// records or acks can be passed between Conduit or the plugin (hard stop).
	// To stop the stream gracefully use the method Stop.
	Start(context.Context) error

	// Write sends a record to the plugin and returns nil if the record was
	// successfully received. This does not necessarily mean that the record was
	// successfully processed and written to the 3rd party system, it might have
	// been cached and will be written at a later point in time. Acknowledgments
	// can be received through Ack to figure out if a record was actually
	// processed or if an error happened while processing it.
	Write(context.Context, record.Record) error
	// Ack blocks until an acknowledgment is received that a record was
	// processed and returns the position of that record. If the record wasn't
	// successfully processed the function returns the position and an error.
	Ack(context.Context) (record.Position, error)

	// Stop should be called to invoke a graceful shutdown of the stream. It
	// will signal the plugin that after receiving the record with the last
	// position no more records will be written to the stream and that the
	// plugin should flush any records that might be cached. The stream will
	// still remain open so Conduit can fetch the remaining acks. After all acks
	// are received Conduit should call Teardown to close the stream. After the
	// stream is closed the Ack method will return the appropriate error
	// signaling the stream is closed.
	Stop(context.Context, record.Position) error

	// Teardown is the last call that must be issued before discarding the
	// plugin. It signals to the plugin it can release any open resources and
	// prepare for a graceful shutdown.
	Teardown(context.Context) error
}

type Dispenser

type Dispenser interface {
	DispenseSpecifier() (SpecifierPlugin, error)
	DispenseSource() (SourcePlugin, error)
	DispenseDestination() (DestinationPlugin, error)
}

Dispenser dispenses specifier, source and destination plugins.

type Parameter

type Parameter struct {
	// Default is the default value of the parameter, if any.
	Default string
	// Required is whether it must be provided in the Config or not.
	Type string
	// Description holds a description of the field and how to configure it.
	Description string
	// Validations list of validations to check for the parameter.
	Validations []Validation
}

Parameter is a helper struct for defining plugin Specifications.

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(builtin registry, standalone registry) *Service

func (*Service) List

func (r *Service) List(ctx context.Context) (map[string]Specification, error)

func (*Service) NewDispenser

func (r *Service) NewDispenser(logger log.CtxLogger, name string) (Dispenser, error)

func (*Service) ValidateDestinationConfig added in v0.2.1

func (r *Service) ValidateDestinationConfig(ctx context.Context, d Dispenser, settings map[string]string) (err error)

func (*Service) ValidateSourceConfig added in v0.2.1

func (r *Service) ValidateSourceConfig(ctx context.Context, d Dispenser, settings map[string]string) (err error)

type SourcePlugin

type SourcePlugin interface {
	// Configure provides the configuration to the plugin and sets it up, so it's
	// ready to start running. If the configuration is invalid the method will
	// return an error.
	Configure(context.Context, map[string]string) error

	// Start will trigger a background process in the plugin that will stream
	// records to Conduit and listen to acks. After Start returns Conduit is
	// allowed to call methods Read and Ack. The stream will keep running until
	// the context passed to Start is closed. If the context is closed no more
	// records or acks can be passed between Conduit or the plugin (hard stop).
	// To stop the stream gracefully use the method Stop.
	Start(context.Context, record.Position) error

	// Read will block until the plugin returns a new record or until the stream
	// is closed (i.e. Stop is called and the plugin closes the stream). All
	// records returned by Read need to be acked using the function Ack and the
	// position of the record. Read will return ErrStreamNotOpen is the stream
	// is not open.
	Read(context.Context) (record.Record, error)
	// Ack signals to the plugin that the record with that position was
	// processed and all resources related to that record can be released.
	Ack(context.Context, record.Position) error

	// Stop should be called to invoke a graceful shutdown of the stream. It
	// will signal the plugin to stop retrieving new records and flush any
	// records that might be cached. The response will contain the position of
	// the last record in the stream. Conduit should keep reading records until
	// it encounters the record with the last position. After it received all
	// records and sent back acks for all successfully processed records it
	// should call Teardown to close the stream. After the stream is closed the
	// Read method will return the appropriate error signaling the stream is
	// closed.
	Stop(context.Context) (record.Position, error)

	// Teardown is the last call that must be issued before discarding the
	// plugin. It signals to the plugin it can release any open resources and
	// prepare for a graceful shutdown.
	Teardown(context.Context) error
}

type Specification

type Specification struct {
	// Name is the name of the plugin.
	Name string
	// Summary is a brief description of the plugin and what it does.
	Summary string
	// Description is a more long form area appropriate for README-like text
	// that the author can provide for documentation about the specified
	// Parameters.
	Description string
	// Version string. Should be a semver prepended with `v`, e.g. `v1.54.3`.
	Version string
	// Author declares the entity that created or maintains this plugin.
	Author string
	// SourceParams and DestinationParams are maps of named Parameters that
	// describe how to configure the plugins Destination or Source.
	SourceParams      map[string]Parameter
	DestinationParams map[string]Parameter
}

Specification is returned by a plugin when Specify is called. It contains information about the configuration parameters for plugins and allows them to describe their parameters.

type SpecifierPlugin

type SpecifierPlugin interface {
	// Specify returns the plugin specification.
	Specify() (Specification, error)
}

type Validation

type Validation struct {
	Type  ValidationType
	Value string
}

type ValidationError added in v0.2.1

type ValidationError struct {
	// contains filtered or unexported fields
}

func (*ValidationError) Error added in v0.2.1

func (e *ValidationError) Error() string

Error formats the error message.

func (*ValidationError) Is added in v0.2.1

func (e *ValidationError) Is(target error) bool

func (*ValidationError) Unwrap added in v0.2.1

func (e *ValidationError) Unwrap() error

Unwrap returns the underlying error.

type ValidationType

type ValidationType int64
const (
	ValidationTypeRequired ValidationType = iota + 1
	ValidationTypeGreaterThan
	ValidationTypeLessThan
	ValidationTypeInclusion
	ValidationTypeExclusion
	ValidationTypeRegex
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL