destination

package
v3.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2023 License: MPL-2.0 Imports: 19 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PluginTestSuiteRunner

func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests, testSourceOptions ...func(o *schema.TestSourceOptions))

func RecordDiff

func RecordDiff(l arrow.Record, r arrow.Record) string

Types

type Client

type Client interface {
	Migrate(ctx context.Context, tables schema.Tables) error
	Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error
	ManagedWriter
	UnmanagedWriter
	DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error
	Close(ctx context.Context) error
}

type ClientResource

type ClientResource struct {
	TableName string
	Data      []any
}

type ManagedWriter

type ManagedWriter interface {
	WriteTableBatch(ctx context.Context, table *schema.Table, data []arrow.Record) error
}

type Metrics

type Metrics struct {
	// Errors number of errors / failed writes
	Errors uint64
	// Writes number of successful writes
	Writes uint64
}

type MigrateStrategy

type MigrateStrategy struct {
	AddColumn           specs.MigrateMode
	AddColumnNotNull    specs.MigrateMode
	RemoveColumn        specs.MigrateMode
	RemoveColumnNotNull specs.MigrateMode
	ChangeColumn        specs.MigrateMode
}

MigrateStrategy defines which tests we should include

type NewClientFunc

type NewClientFunc func(context.Context, zerolog.Logger, specs.Destination) (Client, error)

type NewPluginFunc

type NewPluginFunc func() *Plugin

type Option

type Option func(*Plugin)

func WithBatchTimeout

func WithBatchTimeout(seconds int) Option

func WithDefaultBatchSize

func WithDefaultBatchSize(defaultBatchSize int) Option

func WithDefaultBatchSizeBytes

func WithDefaultBatchSizeBytes(defaultBatchSizeBytes int) Option

func WithManagedWriter

func WithManagedWriter() Option

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func NewPlugin

func NewPlugin(name string, version string, newClientFunc NewClientFunc, opts ...Option) *Plugin

NewPlugin creates a new destination plugin

func (*Plugin) Close

func (p *Plugin) Close(ctx context.Context) error

func (*Plugin) DeleteStale

func (p *Plugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error

func (*Plugin) Init

func (p *Plugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Destination) error

we need lazy loading because we want to be able to initialize after

func (*Plugin) Metrics

func (p *Plugin) Metrics() Metrics

func (*Plugin) Migrate

func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables) error

we implement all DestinationClient functions so we can hook into pre-post behavior

func (*Plugin) Name

func (p *Plugin) Name() string

func (*Plugin) Read

func (p *Plugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error

func (*Plugin) Version

func (p *Plugin) Version() string

func (*Plugin) Write

func (p *Plugin) Write(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan arrow.Record) error

type PluginTestSuite

type PluginTestSuite struct {
	// contains filtered or unexported fields
}

type PluginTestSuiteTests

type PluginTestSuiteTests struct {
	// SkipOverwrite skips testing for "overwrite" mode. Use if the destination
	// plugin doesn't support this feature.
	SkipOverwrite bool

	// SkipDeleteStale skips testing "delete-stale" mode. Use if the destination
	// plugin doesn't support this feature.
	SkipDeleteStale bool

	// SkipAppend skips testing for "append" mode. Use if the destination
	// plugin doesn't support this feature.
	SkipAppend bool

	// SkipSecondAppend skips the second append step in the test.
	// This is useful in cases like cloud storage where you can't append to an
	// existing object after the file has been closed.
	SkipSecondAppend bool

	// SkipMigrateAppend skips a test for the migrate function where a column is added,
	// data is appended, then the column is removed and more data appended, checking that the migrations handle
	// this correctly.
	SkipMigrateAppend bool
	// SkipMigrateAppendForce skips a test for the migrate function where a column is changed in force mode
	SkipMigrateAppendForce bool

	// SkipMigrateOverwrite skips a test for the migrate function where a column is added,
	// data is appended, then the column is removed and more data overwritten, checking that the migrations handle
	// this correctly.
	SkipMigrateOverwrite bool
	// SkipMigrateOverwriteForce skips a test for the migrate function where a column is changed in force mode
	SkipMigrateOverwriteForce bool

	MigrateStrategyOverwrite MigrateStrategy
	MigrateStrategyAppend    MigrateStrategy
}

type UnimplementedManagedWriter

type UnimplementedManagedWriter struct{}

func (UnimplementedManagedWriter) WriteTableBatch

type UnimplementedUnmanagedWriter

type UnimplementedUnmanagedWriter struct{}

func (UnimplementedUnmanagedWriter) Metrics

func (UnimplementedUnmanagedWriter) Write

type UnmanagedWriter

type UnmanagedWriter interface {
	Write(ctx context.Context, tables schema.Tables, res <-chan arrow.Record) error
	Metrics() Metrics
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL