types

package
v0.0.0-...-4dcfcdd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package types contains data types and interfaces that define the major functional blocks of code within cdc-sink. The goal of placing the types into this package is to make it easy to compose functionality as the cdc-sink project evolves.

Index

Constants

View Source
const CustomUpsert = "upsert.custom"

CustomUpsert defines a custom upsert template that will be used for the mutation. A mutation may specify a custom upsert by adding an entry in the Meta map below Meta[CustomUpsert] = "custom.template.name"

View Source
const ToastedColumnPlaceholder = `"__cdc__sink__toasted__"`

ToastedColumnPlaceholder is a placeholder to identify a unchanged Postgres toasted column. Must be quoted, so it can be used in JSON columns.

Variables

View Source
var (
	// ErrCancelSingleton may be returned by callbacks passed to
	// leases.Singleton to shut down cleanly.
	ErrCancelSingleton = errors.New("singleton requested cancellation")
)

Functions

This section is empty.

Types

type AcceptOptions

type AcceptOptions struct {
	TargetQuerier TargetQuerier // Override the target database access.
}

AcceptOptions is an API escape hatch to provide hints or other metadata to acceptor implementations.

func (*AcceptOptions) Copy

func (o *AcceptOptions) Copy() *AcceptOptions

Copy returns a copy of the options.

type Accumulator

type Accumulator interface {
	// Accumulate the mutation and ensure that batch invariants are
	// maintained.
	Accumulate(table ident.Table, mut Mutation) error
}

Accumulator contains the Accumulate method.

type AccumulatorFunc

type AccumulatorFunc func(ident.Table, Mutation) error

AccumulatorFunc adapts a function to the Accumulator interface.

func (AccumulatorFunc) Accumulate

func (f AccumulatorFunc) Accumulate(table ident.Table, mut Mutation) error

Accumulate implements Accumulator.

type AnyPool

type AnyPool interface {
	*SourcePool | *StagingPool | *TargetPool
	Info() *PoolInfo
}

AnyPool is a generic type constraint for any database pool type that we support.

type Authenticator

type Authenticator interface {
	// Check returns true if a request containing some bearer token
	// should be allowed to operate on the given schema.
	Check(ctx context.Context, schema ident.Schema, token string) (ok bool, _ error)
}

An Authenticator determines if an operation on some schema should be allowed to proceed.

type Batch

type Batch[B any] interface {
	Accumulator

	// Count returns the number of mutations contained in the Batch.
	Count() int

	// Copy returns a deep copy of the Batch.
	Copy() B

	// CopyInto copies the contents of the batch into accumulator.
	CopyInto(acc Accumulator) error

	// Empty returns a copy of the Batch, but with no enclosed
	// mutations. This is useful when wanting to transform or filter a
	// batch.
	Empty() B
}

The Batch interface is implemented by the various Batch types in this package.

type ColData

type ColData struct {
	// A SQL expression to use with sparse payloads.
	DefaultExpr string
	Ignored     bool
	Name        ident.Ident
	// A Parse function may be supplied to allow a datatype
	// to be converted into a type more readily
	// used by a target database driver.
	Parse   func(any) (any, error) `json:"-"`
	Primary bool
	// Type of the column.
	Type string
}

ColData hold SQL column metadata.

func (ColData) Equal

func (d ColData) Equal(o ColData) bool

Equal returns true if the two ColData are equivalent under case-insensitivity.

type Counter

type Counter interface {
	Add(float64)
}

Counter tracks the number of times an operation is executed.

type DLQ

type DLQ interface {
	Enqueue(ctx context.Context, tx TargetQuerier, mut Mutation) error
}

A DLQ is a dead-letter queue that allows mutations to be written to the target for offline reconciliation.

type DLQs

type DLQs interface {
	Get(ctx context.Context, target ident.Schema, name string) (DLQ, error)
}

DLQs provides named dead-letter queues in the target schema.

type Deadlines

type Deadlines = *ident.Map[time.Duration]

Deadlines associate a column identifier with a duration.

type Lease

type Lease interface {
	// Context will be canceled when the lease has expired.
	Context() context.Context

	// Release terminates the Lease.
	Release()
}

A Lease represents a time-based, exclusive lock.

type LeaseBusyError

type LeaseBusyError struct {
	Expiration time.Time
}

LeaseBusyError is returned by [Leases.Acquire] if another caller holds the lease.

func IsLeaseBusy

func IsLeaseBusy(err error) (busy *LeaseBusyError, ok bool)

IsLeaseBusy returns the error if it represents a busy lease.

func (*LeaseBusyError) Error

func (e *LeaseBusyError) Error() string

type Leases

type Leases interface {
	// Acquire the named lease. A [LeaseBusyError] will be returned if
	// another caller has already acquired the lease.
	Acquire(ctx context.Context, name string) (Lease, error)

	// Singleton executes a callback when the named lease is acquired.
	//
	// The lease will be released in the following circumstances:
	//   * The callback function returns.
	//   * The lease cannot be renewed before it expires.
	//   * The outer context is canceled.
	//
	// If the callback returns a non-nil error, the error will be
	// logged. If the callback returns ErrCancelSingleton, it will not
	// be retried. In all other cases, the callback function is retried
	// once a lease is re-acquired.
	Singleton(ctx context.Context, name string, fn func(ctx context.Context) error)
}

Leases coordinates behavior across multiple instances of cdc-sink.

type Memo

type Memo interface {
	// Get retrieves the value associate to the given key.
	// If the value is not found, a nil slice is returned.
	Get(ctx context.Context, tx StagingQuerier, key string) ([]byte, error)
	// Put stores a value associated to the key.
	Put(ctx context.Context, tx StagingQuerier, key string, value []byte) error
}

A Memo is a key store that persists a value associated to a key

type MultiAcceptor

type MultiAcceptor interface {
	TemporalAcceptor
	// AcceptMultiBatch processes the batch. The options may be nil.
	AcceptMultiBatch(context.Context, *MultiBatch, *AcceptOptions) error
}

A MultiAcceptor operates on a MultiBatch to achieve some effect within the target.

func CountingAcceptor

func CountingAcceptor(
	delegate MultiAcceptor, errors Counter, received Counter, successes Counter,
) MultiAcceptor

CountingAcceptor instantiates a new acceptor that records top-of-funnel mutations counts, classifies them based on success or failure, and then delegates to another acceptor.

func OrderedAcceptorFrom

func OrderedAcceptorFrom(acc TableAcceptor, watchers Watchers) MultiAcceptor

OrderedAcceptorFrom will return an adaptor which iterates over tables in the table-dependency order. The returned acceptor will respect the [AcceptOptions.TableOrder] when iterating within a TemporalBatch.

type MultiBatch

type MultiBatch struct {
	ByTime map[hlc.Time]*TemporalBatch // Time-based indexing.
	Data   []*TemporalBatch            // Time-ordered indexed.
	// contains filtered or unexported fields
}

A MultiBatch is a time-ordered collection of per-table data to apply. This represents the broadest scope of applying data, as it covers both time- and table-space and likely represents any number of source database transactions.

A well-formed MultiBatch will have its Data field sorted by time. MultiBatch implements sort.Interface to make this simple.

func (*MultiBatch) Accumulate

func (b *MultiBatch) Accumulate(table ident.Table, mut Mutation) error

Accumulate adds a mutation to the MultiBatch while maintaining the order invariants.

func (*MultiBatch) Copy

func (b *MultiBatch) Copy() *MultiBatch

Copy returns a deep copy of the MultiBatch.

func (*MultiBatch) CopyInto

func (b *MultiBatch) CopyInto(acc Accumulator) error

CopyInto copies the batch into the Accumulator. The data will be ordered by time, table, and key.

func (*MultiBatch) Count

func (b *MultiBatch) Count() int

Count returns the number of enclosed mutations.

func (*MultiBatch) Empty

func (b *MultiBatch) Empty() *MultiBatch

Empty returns an empty MultiBatch.

func (*MultiBatch) Len

func (b *MultiBatch) Len() int

Len implements sort.Interface.

func (*MultiBatch) Less

func (b *MultiBatch) Less(i, j int) bool

Less implements sort.Interface.

func (*MultiBatch) Swap

func (b *MultiBatch) Swap(i, j int)

Swap implements sort.Interface.

type Mutation

type Mutation struct {
	Before json.RawMessage // Optional encoded JSON object
	Data   json.RawMessage // An encoded JSON object: { "key" : "hello" }
	Key    json.RawMessage // An encoded JSON array: [ "hello" ]
	Meta   map[string]any  // Dialect-specific data, may be nil, not persisted
	Time   hlc.Time        // The effective time of the mutation
}

A Mutation describes a row to upsert into the target database. That is, it is a collection of column values to apply to a row in some table.

func Flatten

func Flatten[B any](batch Batch[B]) []Mutation

Flatten copies all mutations in a batch into a slice.

func (Mutation) IsDelete

func (m Mutation) IsDelete() bool

IsDelete returns true if the Mutation represents a deletion.

type PoolInfo

type PoolInfo struct {
	ConnectionString string
	Product          Product
	Version          string

	// ErrCode returns a database error code, if err is of the pool's
	// underlying driver error type.
	ErrCode func(err error) (string, bool) `json:"-"`
	// HintNoFTS decorates the table name to prevent CockroachDB from
	// generating an UPSERT (or other) plan that may involve a full
	// table scan. For other databases or versions of CockroachDB that
	// do not support this hint, this function returns an unhinted table
	// name.
	//
	// https://www.cockroachlabs.com/docs/stable/table-expressions#prevent-full-scan
	// https://github.com/cockroachdb/cockroach/issues/98211
	HintNoFTS func(table ident.Table) *ident.Hinted[ident.Table] `json:"-"`
	// IsDeferrable returns true if the error might clear if the work is
	// tried at a future point in time (e.g.: foreign key dependencies).
	IsDeferrable func(err error) bool `json:"-"`
	// ShouldRetry returns true if the error should be retried
	// immediately (e.g.: serializable isolation failure).
	ShouldRetry func(err error) bool `json:"-"`
}

PoolInfo describes a database connection pool and what it's connected to.

func (*PoolInfo) Info

func (i *PoolInfo) Info() *PoolInfo

Info returns the PoolInfo when embedded.

type Product

type Product int

Product is an enum type to make it easy to switch on the underlying database.

const (
	ProductUnknown Product = iota
	ProductCockroachDB
	ProductMariaDB
	ProductMySQL
	ProductOracle
	ProductPostgreSQL
)

These are various product types that we support.

func (Product) ExpandSchema

func (p Product) ExpandSchema(s ident.Schema) (ident.Schema, error)

ExpandSchema validates a Schema against the expected form used by the database. This method may return an alternate value to add missing default namespace elements.

func (Product) String

func (i Product) String() string

type SchemaData

type SchemaData struct {
	Columns *ident.TableMap[[]ColData]

	// Order is a two-level slice that represents equivalency-groups
	// with respect to table foreign-key ordering. That is, if all
	// updates for tables in Order[N] are applied, then updates in
	// Order[N+1] can then be applied.
	//
	// The need for this data can be revisited if CRDB adds support
	// for deferrable foreign-key constraints:
	// https://github.com/cockroachdb/cockroach/issues/31632
	Order [][]ident.Table
}

SchemaData holds SQL schema metadata.

func (*SchemaData) OriginalName

func (s *SchemaData) OriginalName(tbl ident.Table) (ident.Table, bool)

OriginalName returns the name of the table as it is defined in the underlying database.

type SourcePool

type SourcePool struct {
	*sql.DB
	PoolInfo
	// contains filtered or unexported fields
}

SourcePool is an injection point for a connection to a source database.

type Stager

type Stager interface {
	// MarkApplied will mark the given mutations as having been applied.
	// This is used with lease-based unstaging or when certain mutation
	// should be skipped.
	MarkApplied(ctx context.Context, db StagingQuerier, muts []Mutation) error

	// Retire will delete staged mutations whose timestamp is less than
	// or equal to the given end time. Note that this call may take an
	// arbitrarily long amount of time to complete and its effects may
	// not occur within a single database transaction.
	Retire(ctx context.Context, db StagingQuerier, end hlc.Time) error

	// Stage writes the mutations into the staging table. This method is
	// idempotent.
	Stage(ctx context.Context, db StagingQuerier, muts []Mutation) error

	// StageIfExists will stage a mutation only if there is already a
	// mutation staged for its key. It returns a filtered copy of the
	// mutations that were not staged. This method is used to implement
	// the non-transactional mode, where we try to apply a mutation to
	// some key if there isn't already a mutation queued for that key.
	StageIfExists(ctx context.Context, db StagingQuerier, muts []Mutation) ([]Mutation, error)
}

Stager describes a service which can durably persist some number of Mutations.

type Stagers

type Stagers interface {
	Get(ctx context.Context, target ident.Table) (Stager, error)

	// Read provides access to joined staging data. Because this can
	// be a potentially expensive or otherwise unbounded amount of data,
	// the results are provided via a channel which may be incrementally
	// consumed from buffered data. Results will be provided in temporal
	// order.
	//
	// Any errors encountered while reading will be returned in the
	// final message before closing the channel.
	//
	// Care should be taken to [stopper.Context.Stop] the context passed
	// into this method to prevent goroutine or database leaks. When the
	// context is gracefully stopped, the channel will be closed
	// normally.
	Read(ctx *stopper.Context, q *StagingQuery) (<-chan *StagingCursor, error)
}

Stagers is a factory for Stager instances.

type StagingCursor

type StagingCursor struct {
	// A batch of data, corresponding to a transaction in the source
	// database. This may be nil for a progress-only update.
	Batch *TemporalBatch

	// This field will be populated if the reader encounters an
	// unrecoverable error while processing. A result containing an
	// error will be the final message in the channel before it is
	// closed.
	Error error

	// Fragment will be set if the Batch is not guaranteed to contain
	// all data for its given timestamp. This will occur, for example,
	// if the number of mutations for the timestamp exceeds
	// [StagingQuery.FragmentSize] or if an underlying database query is
	// not guaranteed to have yet read all values at the the batch's
	// time (e.g. scan boundary alignment). Consumers that require
	// transactionally-consistent views of the data should wait for the
	// next, non-fragmented, cursor update.
	Fragment bool

	// Jump indicates that the scanning bounds changed such that the
	// data in the stream may be disjoint.
	Jump bool

	// Progress indicates the range of data which has been successfully
	// scanned so far. Receivers may encounter progress-only updates
	// which happen when the end of the scanned bounds have been reached
	// or if there is a "pipeline bubble" when reading data from the
	// staging tables.
	Progress hlc.Range
}

StagingCursor is emitted by [Stagers.Read].

func (*StagingCursor) String

func (c *StagingCursor) String() string

String is for debugging use only.

type StagingPool

type StagingPool struct {
	*pgxpool.Pool
	PoolInfo
	// contains filtered or unexported fields
}

StagingPool is an injection point for a connection to the staging database.

type StagingQuerier

type StagingQuerier interface {
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) pgx.Row
}

StagingQuerier is implemented by pgxpool.Pool, pgxpool.Conn, pgxpool.Tx, pgx.Conn, and pgx.Tx types. This allows a degree of flexibility in defining types that require a database connection.

type StagingQuery

type StagingQuery struct {
	// The bounds variable governs the reader to ensure that it does not
	// read beyond the data known to be consistent, by pausing reads
	// when the maximum time has been reached. The minimum value of the
	// bounds may be updated to optimize database scans.
	Bounds *notify.Var[hlc.Range]

	// FragmentSize places an upper bound on the size of any individual
	// cursor entry. If a batch has exceeded the requested segment size,
	// the [StagingCursor.Fragment] flag will be set.
	FragmentSize int

	// The tables to query.
	Group *TableGroup
}

StagingQuery is passed to [Stagers.Read].

type TableAcceptor

type TableAcceptor interface {
	// AcceptTableBatch processes the batch. The options may be nil.
	AcceptTableBatch(context.Context, *TableBatch, *AcceptOptions) error
}

A TableAcceptor operates on a TableBatch.

type TableBatch

type TableBatch struct {
	Data  []Mutation
	Table ident.Table
	Time  hlc.Time
	// contains filtered or unexported fields
}

A TableBatch contains mutations destined for a single table at a single timestamp. This likely corresponds to some part of a larger transaction.

func (*TableBatch) Accumulate

func (b *TableBatch) Accumulate(table ident.Table, mut Mutation) error

Accumulate a mutation.

func (*TableBatch) Copy

func (b *TableBatch) Copy() *TableBatch

Copy returns a deep copy of the batch.

func (*TableBatch) CopyInto

func (b *TableBatch) CopyInto(acc Accumulator) error

CopyInto copies the batch into the Accumulator ordered by key.

func (*TableBatch) Count

func (b *TableBatch) Count() int

Count returns the number of enclosed mutations.

func (*TableBatch) Empty

func (b *TableBatch) Empty() *TableBatch

Empty returns a TableBatch with the original metadata, but no data.

type TableGroup

type TableGroup struct {
	// A (globally-unique) name for the group that is used for
	// mutual-exclusion. It is often, but not required to be, the
	// enclosing database schema for the table.
	Name ident.Ident
	// Optional. If present, this field indicates that the group
	// comprises the entire schema, implying known and unknown tables.
	Enclosing ident.Schema
	// The tables which comprise the group.
	Tables []ident.Table
}

A TableGroup is a named collection of tables. This is properly called a "schema", but that noun is overloaded.

func (*TableGroup) Copy

func (g *TableGroup) Copy() *TableGroup

Copy returns a deep copy of the TableGroup.

func (*TableGroup) Schema

func (g *TableGroup) Schema() (ident.Schema, error)

Schema returns a common schema for the group, either [TableGroup.Enclosing] or a schema common to all configured tables.

func (*TableGroup) String

func (g *TableGroup) String() string

String is for debugging use only.

type TargetPool

type TargetPool struct {
	*sql.DB
	PoolInfo
	// contains filtered or unexported fields
}

TargetPool is an injection point for a connection to the target database.

type TargetQuerier

type TargetQuerier interface {
	ExecContext(ctx context.Context, sql string, arguments ...interface{}) (sql.Result, error)
	QueryContext(ctx context.Context, sql string, optionsAndArgs ...interface{}) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, sql string, optionsAndArgs ...interface{}) *sql.Row
}

TargetQuerier is implemented by sql.DB and sql.Tx.

type TargetStatements

type TargetStatements struct {
	*stmtcache.Cache[string]
}

TargetStatements is an injection point for a cache of prepared statements associated with the TargetPool.

type TargetTx

type TargetTx interface {
	TargetQuerier
	Commit() error
	Rollback() error
}

TargetTx is implemented by sql.Tx.

type TemporalAcceptor

type TemporalAcceptor interface {
	TableAcceptor
	// AcceptTemporalBatch processes the batch. The options may be nil.
	AcceptTemporalBatch(context.Context, *TemporalBatch, *AcceptOptions) error
}

A TemporalAcceptor operates on a batch of data that has a single timestamp (i.e. a source database transaction).

type TemporalBatch

type TemporalBatch struct {
	Time hlc.Time
	Data ident.TableMap[*TableBatch]
	// contains filtered or unexported fields
}

A TemporalBatch holds mutations for some number of tables that all occur at the same time. This likely corresponds to a single source transaction.

func (*TemporalBatch) Accumulate

func (b *TemporalBatch) Accumulate(table ident.Table, mut Mutation) error

Accumulate adds a mutation for the given table to the batch.

func (*TemporalBatch) Copy

func (b *TemporalBatch) Copy() *TemporalBatch

Copy returns a deep copy of the TemporalBatch.

func (*TemporalBatch) CopyInto

func (b *TemporalBatch) CopyInto(acc Accumulator) error

CopyInto copies the batch into the Accumulator. The data will be sorted by table name and then by key.

func (*TemporalBatch) Count

func (b *TemporalBatch) Count() int

Count returns the number of enclosed mutations.

func (*TemporalBatch) Empty

func (b *TemporalBatch) Empty() *TemporalBatch

Empty returns an empty TemporalBatch at the same time.

type Watcher

type Watcher interface {
	// Get returns a snapshot of all tables in the target database.
	// The returned struct must not be modified.
	Get() *SchemaData
	// Refresh will force the Watcher to immediately query the database
	// for updated schema information. This is intended for testing and
	// does not need to be called in the general case.
	Refresh(context.Context, *TargetPool) error
	// Watch returns a channel that emits updated column data for the
	// given table.  The channel will be closed if there
	Watch(table ident.Table) (_ <-chan []ColData, cancel func(), _ error)
}

Watcher allows table metadata to be observed.

The methods in this type return column data such that primary key columns are returned first, in their declaration order, followed by all other non-pk columns.

type Watchers

type Watchers interface {
	Get(db ident.Schema) (Watcher, error)
}

Watchers is a factory for Watcher instances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL