dblog

package
v0.0.0-rc6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LowWatermarkType     = "L"
	HighWatermarkType    = "H"
	SuccessWatermarkType = "S"
	BadWatermarkType     = "B"
)
View Source
const (
	FallbackChunkSize       = uint64(100_000)
	DefaultChunkSizeInBytes = uint64(10_000_000)

	AlwaysTrueWhereStatement = abstract.WhereStatement("1 = 1")
)

Variables

This section is empty.

Functions

func ConvertArrayToString

func ConvertArrayToString(array []string) (string, error)

func ConvertStringToArray

func ConvertStringToArray(jsonString string) ([]string, error)

func InferChunkSize

func InferChunkSize(storage abstract.SampleableStorage, tableID abstract.TableID, chunkSizeInBytes uint64) (uint64, error)

func MakeNextWhereStatement

func MakeNextWhereStatement(primaryKey, lowBound []string) abstract.WhereStatement

func MakeSQLTuple

func MakeSQLTuple(stringArray []string) string

func PKeysToStringArr

func PKeysToStringArr(item *abstract.ChangeItem, primaryKey []string, converter ChangeItemConverter) ([]string, error)

func ResolveChunkMapFromArr

func ResolveChunkMapFromArr(items []abstract.ChangeItem, primaryKey []string, converter ChangeItemConverter) (map[string]abstract.ChangeItem, error)

func ResolvePrimaryKeyColumns

func ResolvePrimaryKeyColumns(
	ctx context.Context,
	storage abstract.Storage,
	tableID abstract.TableID,
	IsSupportedKeyType func(keyType string) bool,
) ([]string, error)

Types

type ChangeItemConverter

type ChangeItemConverter func(val interface{}, colSchema abstract.ColSchema) (string, error)

type IncrementalAsyncSink

type IncrementalAsyncSink struct {
	// contains filtered or unexported fields
}

func NewIncrementalAsyncSink

func NewIncrementalAsyncSink(
	ctx context.Context,
	signalTable SignalTable,
	table abstract.TableID,
	tableIterator *IncrementalIterator,
	primaryKey []string,
	chunk map[string]abstract.ChangeItem,
	itemConverter ChangeItemConverter,
	stopCallback func(),
	outputPusher abstract.Pusher,
) *IncrementalAsyncSink

func (*IncrementalAsyncSink) AsyncPush

func (s *IncrementalAsyncSink) AsyncPush(items []abstract.ChangeItem) chan error

func (*IncrementalAsyncSink) Close

func (s *IncrementalAsyncSink) Close() error

type IncrementalIterator

type IncrementalIterator struct {
	LowWatermarkUUID  uuid.UUID
	HighWatermarkUUID uuid.UUID
	// contains filtered or unexported fields
}

func NewIncrementalIterator

func NewIncrementalIterator(
	storage tablequery.StorageTableQueryable,
	tableQuery *tablequery.TableQuery,
	signalTable SignalTable,
	itemConverter ChangeItemConverter,
	pkColNames []string,
	lowBound []string,
	limit uint64,
	betweenMarksOpts ...func(),
) (*IncrementalIterator, error)

func (*IncrementalIterator) Next

type MockSignalTable

type MockSignalTable struct {
	CreateWatermarkF func(ctx context.Context, tableID abstract.TableID, watermarkType WatermarkType, lowBound []string) (uuid.UUID, error)
	IsWatermarkF     func(item *abstract.ChangeItem, tableID abstract.TableID, mark uuid.UUID) (bool, WatermarkType)
}

func NewMockSignalTable

func NewMockSignalTable() *MockSignalTable

func (*MockSignalTable) CreateWatermark

func (m *MockSignalTable) CreateWatermark(ctx context.Context, tableID abstract.TableID, watermarkType WatermarkType, lowBound []string) (uuid.UUID, error)

func (*MockSignalTable) IsWatermark

func (m *MockSignalTable) IsWatermark(item *abstract.ChangeItem, tableID abstract.TableID, mark uuid.UUID) (bool, WatermarkType)

type SignalTable

type SignalTable interface {
	CreateWatermark(ctx context.Context, tableID abstract.TableID, watermarkType WatermarkType, lowBound []string) (uuid.UUID, error)
	IsWatermark(item *abstract.ChangeItem, tableID abstract.TableID, markUUID uuid.UUID) (bool, WatermarkType)
}

The SignalTable is used to create watermarks in the wal, and check whether the read item is a watermark

It is assumed that the low_watermark will be placed before reading data from the table, and the high_watermark after reading data from the table

When reading the wal, we have 2 types of watermarks needed to check in wal (low and high):

  • low - means that we are now just before the moment, when we start reading from the database
  • high - means that we have completed reading

Also we have 2 types needed to resolve additional problem (success watermark and bad watermark):

  • success - needed to save the last successfully transferred chunk
  • bad - it is necessary to mark an invalid watermark type

type WatermarkType

type WatermarkType string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL