providers

package
v0.0.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

README

Data Plane Providers

Each integrated data storage is combination of provider interface. Provider is a struct that combine enabled features of storage that this provider implements. Bare minimum implementation for providers looks follow:

// Register provider factory into data plane provider registry
func init() {
	providers.Register(ProviderType, New)
}

// Define new ProviderType enum value
const ProviderType = abstract.ProviderType("my-awesome-provider")

// Provider impl
type MyAwesomeProvider struct {
	logger   log.Logger
	registry metrics.Registry
	cp       cpclient.ControlPlane
	transfer *server.Transfer
}

// Define type for new provider Imple
func (p MyAwesomeProvider) Type() abstract.ProviderType {
	return ProviderType
}

func New(lgr log.Logger, registry metrics.Registry, cp cpclient.ControlPlane, transfer *server.Transfer) providers.Provider {
	return &Provider{
		logger:   lgr,
		registry: registry,
		cp:       cp,
		transfer: transfer,
	}
}

Depenency graph

Provider is one of core interfaces. Each specific provider register itself in providers registry. Each task/sink/storage construct anything provider-related from providers registry.

new-dep-graph

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NetworkUnreachable = coded.Register("generic", "network", "unreachable")
	UnknownCluster     = coded.Register("generic", "unknown_cluster")
	InvalidCredential  = coded.Register("generic", "invalid_credentials")

	// MissingData means that user asked for a table / topic / object wich is not exists on a source side
	MissingData = coded.Register("generic", "missing_data")

	// DataOutOfRange means data type is correct but the value is outside the supported range
	DataOutOfRange = coded.Register("data", "out_of_range")
	// UnsupportedConversion means the source data type cannot be converted into the target data type
	UnsupportedConversion = coded.Register("data", "unsupported_type_conversion")
	// DataValueError means there is something wrong with the value itself (i.e value []byte("foo") cannot be put into Decimal field)
	DataValueError = coded.Register("data", "value_error")
)
View Source
var NopActivateCallback = ActivateCallbacks{
	Cleanup:       func(table abstract.TableMap) error { return nil },
	Upload:        func(table abstract.TableMap) error { return nil },
	CheckIncludes: func(table abstract.TableMap) error { return nil },
	Rollbacks:     new(util.Rollbacks),
}

Functions

func Destination

func Destination[T Provider](lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, transfer *server.Transfer) (T, bool)

Destination resolve a specific provider interface from registry by `transfer.DstType()` provider type.

func Register

func Register(providerType abstract.ProviderType, fac ProviderFactory)

Register add new provider factory to known providers registry.

func Source

func Source[T Provider](lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, transfer *server.Transfer) (T, bool)

Source resolve a specific provider interface from registry by `transfer.SrcType()` provider type.

Types

type Abstract2Provider

type Abstract2Provider interface {
	Provider
	DataProvider() (base.DataProvider, error)
}

Abstract2Provider add `base.DataProvider` factory to provider. this means that provider can do abstract2 data provider

type Abstract2Sinker

type Abstract2Sinker interface {
	Provider
	Target(...abstract.SinkOption) (base.EventTarget, error)
}

Abstract2Sinker add abstract2 writer factory to provider

type ActivateCallbacks

type ActivateCallbacks struct {
	Cleanup       TablesOperationFunc
	Upload        TablesOperationFunc
	CheckIncludes TablesOperationFunc
	Rollbacks     *util.Rollbacks
}

type Activator

type Activator interface {
	Provider
	Activate(ctx context.Context, task *server.TransferOperation, table abstract.TableMap, callbacks ActivateCallbacks) error
}

Activator enable custom functionality on transfer `Activate` task.

type AsyncSinker

type AsyncSinker interface {
	Provider
	AsyncSink(middleware abstract.Middleware) (abstract.AsyncSink, error)
}

AsyncSinker add ability to setup async-sink instead of sync-sink for provider

type Cleaner

type Cleaner interface {
	cleanup.Closeable
	CleanupTmp(ctx context.Context, transferID string, tmpPolicy *server.TmpPolicyConfig) error
}

type Cleanuper

type Cleanuper interface {
	Provider
	Cleanup(ctx context.Context, task *server.TransferOperation) error
}

Cleanuper enable custom functionality on transfer `Activate`/`Upload`/`Reupload` tasks on `Cleanup` stage.

type Deactivator

type Deactivator interface {
	Provider
	Deactivate(ctx context.Context, task *server.TransferOperation) error
}

Deactivator enable custom functionality on transfer `Deactivate` task.

type Provider

type Provider interface {
	Type() abstract.ProviderType
}

Provider is a bare minimal implementation of provider that can do anything except existing.

type ProviderFactory

type ProviderFactory func(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, transfer *server.Transfer) Provider

type Replication

type Replication interface {
	Provider
	Source() (abstract.Source, error)
}

Replication add to provider `abstract.Source` factory to provider. this means that provider can do data replication

type Sampleable

type Sampleable interface {
	Provider
	SourceSampleableStorage() (abstract.SampleableStorage, []abstract.TableDescription, error)
	DestinationSampleableStorage() (abstract.SampleableStorage, error)
}

Sampleable add ability to run `Checksum` to provider.

type Sinker

type Sinker interface {
	Provider
	Sink(config middlewares.Config) (abstract.Sinker, error)
}

Sinker add generic writer factory to provider

type Snapshot

type Snapshot interface {
	Provider
	Storage() (abstract.Storage, error)
}

Snapshot add to provider `abstract.Storage` factory to provider. this means that provider can read historycal snapshots of data

type SnapshotSinker

type SnapshotSinker interface {
	Provider
	SnapshotSink(config middlewares.Config) (abstract.Sinker, error)
}

SnapshotSinker optional separate writer for snapshots. Will always called for snapshots with all control events

type Sniffer

type Sniffer interface {
	Provider
	Sniffer(ctx context.Context) (abstract.Fetchable, error)
}

Peeker is a thing that allow to sniff data replication sample

type TMPCleaner

type TMPCleaner interface {
	Provider
	TMPCleaner(ctx context.Context, task *server.TransferOperation) (Cleaner, error)
}

TMPCleaner enable custom functionality on transfer `TMP Policy` inside `Cleanup` stage of `Activate` task.

type TablesOperationFunc

type TablesOperationFunc = func(table abstract.TableMap) error

type Tester

type Tester interface {
	Provider
	TestChecks() []abstract.CheckType // list of provider specific checks
	Test(ctx context.Context) *abstract.TestResult
}

Tester check that it's possible to execute provider with provided transfer params. Will return structured test result for that specific provider.

type Updater

type Updater interface {
	Provider
	Update(ctx context.Context, addedTables []abstract.TableDescription) error
}

Updater enable custom functionality on transfer `Update` tasks.

type Verifier

type Verifier interface {
	Provider
	Verify(ctx context.Context) error
}

Verifier check that it's possible to execute provider with provided transfer params. Will return either OK or ERROR for specific provider.

Directories

Path Synopsis
Package ch cluster - it's like stand-alone cluster with multimaster []*SinkServer - masters (AltHosts).
Package ch cluster - it's like stand-alone cluster with multimaster []*SinkServer - masters (AltHosts).
httpclient
Code generated by MockGen.
Code generated by MockGen.
Package kafka is a generated GoMock package.
Package kafka is a generated GoMock package.
s3
ydb
yt
sink
Used only in sorted_table
Used only in sorted_table

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL