Documentation ¶
Overview ¶
Package types contains data types and interfaces that define the major functional blocks of code within Replicator. The goal of placing the types into this package is to make it easy to compose functionality as the Replicator project evolves.
Index ¶
- Constants
- Variables
- type AcceptOptions
- type Accumulator
- type AccumulatorFunc
- type AnyPool
- type Authenticator
- type Batch
- type ColData
- type Counter
- type DLQ
- type DLQs
- type Deadlines
- type Lease
- type LeaseBusyError
- type Leases
- type Memo
- type MultiAcceptor
- type MultiBatch
- func (b *MultiBatch) Accumulate(table ident.Table, mut Mutation) error
- func (b *MultiBatch) Copy() *MultiBatch
- func (b *MultiBatch) CopyInto(acc Accumulator) error
- func (b *MultiBatch) Count() int
- func (b *MultiBatch) Empty() *MultiBatch
- func (b *MultiBatch) Len() int
- func (b *MultiBatch) Less(i, j int) bool
- func (b *MultiBatch) Swap(i, j int)
- type Mutation
- type PoolInfo
- type Product
- type SchemaData
- type SourcePool
- type Stager
- type Stagers
- type StagingCursor
- type StagingPool
- type StagingQuerier
- type StagingQuery
- type TableAcceptor
- type TableBatch
- type TableGroup
- type TargetPool
- type TargetQuerier
- type TargetStatements
- type TargetTx
- type TemporalAcceptor
- type TemporalBatch
- type Watcher
- type Watchers
Constants ¶
const CustomUpsert = "upsert.custom"
CustomUpsert defines a custom upsert template that will be used for the mutation. A mutation may specify a custom upsert by adding an entry in the Meta map below Meta[CustomUpsert] = "custom.template.name"
const ToastedColumnPlaceholder = `"__cdc__sink__toasted__"`
ToastedColumnPlaceholder is a placeholder to identify a unchanged Postgres toasted column. Must be quoted, so it can be used in JSON columns.
Variables ¶
var ( // ErrCancelSingleton may be returned by callbacks passed to // leases.Singleton to shut down cleanly. ErrCancelSingleton = errors.New("singleton requested cancellation") )
Functions ¶
This section is empty.
Types ¶
type AcceptOptions ¶
type AcceptOptions struct {
TargetQuerier TargetQuerier // Override the target database access.
}
AcceptOptions is an API escape hatch to provide hints or other metadata to acceptor implementations.
func (*AcceptOptions) Copy ¶
func (o *AcceptOptions) Copy() *AcceptOptions
Copy returns a copy of the options.
type Accumulator ¶
type Accumulator interface { // Accumulate the mutation and ensure that batch invariants are // maintained. Accumulate(table ident.Table, mut Mutation) error }
Accumulator contains the Accumulate method.
type AccumulatorFunc ¶
AccumulatorFunc adapts a function to the Accumulator interface.
func (AccumulatorFunc) Accumulate ¶
func (f AccumulatorFunc) Accumulate(table ident.Table, mut Mutation) error
Accumulate implements Accumulator.
type AnyPool ¶
type AnyPool interface { *SourcePool | *StagingPool | *TargetPool Info() *PoolInfo }
AnyPool is a generic type constraint for any database pool type that we support.
type Authenticator ¶
type Authenticator interface { // Check returns true if a request containing some bearer token // should be allowed to operate on the given schema. Check(ctx context.Context, schema ident.Schema, token string) (ok bool, _ error) }
An Authenticator determines if an operation on some schema should be allowed to proceed.
type Batch ¶
type Batch[B any] interface { Accumulator // Count returns the number of mutations contained in the Batch. Count() int // Copy returns a deep copy of the Batch. Copy() B // CopyInto copies the contents of the batch into accumulator. CopyInto(acc Accumulator) error // Empty returns a copy of the Batch, but with no enclosed // mutations. This is useful when wanting to transform or filter a // batch. Empty() B }
The Batch interface is implemented by the various Batch types in this package.
type ColData ¶
type ColData struct { // A SQL expression to use with sparse payloads. DefaultExpr string Ignored bool Name ident.Ident // A Parse function may be supplied to allow a datatype // to be converted into a type more readily // used by a target database driver. Parse func(any) (any, error) `json:"-"` Primary bool // Type of the column. Type string }
ColData hold SQL column metadata.
type Counter ¶
type Counter interface {
Add(float64)
}
Counter tracks the number of times an operation is executed.
type DLQ ¶
type DLQ interface {
Enqueue(ctx context.Context, tx TargetQuerier, mut Mutation) error
}
A DLQ is a dead-letter queue that allows mutations to be written to the target for offline reconciliation.
type Lease ¶
type Lease interface { // Context will be canceled when the lease has expired. Context() context.Context // Release terminates the Lease. Release() }
A Lease represents a time-based, exclusive lock.
type LeaseBusyError ¶
LeaseBusyError is returned by [Leases.Acquire] if another caller holds the lease.
func IsLeaseBusy ¶
func IsLeaseBusy(err error) (busy *LeaseBusyError, ok bool)
IsLeaseBusy returns the error if it represents a busy lease.
func (*LeaseBusyError) Error ¶
func (e *LeaseBusyError) Error() string
type Leases ¶
type Leases interface { // Acquire the named lease. A [LeaseBusyError] will be returned if // another caller has already acquired the lease. Acquire(ctx context.Context, name string) (Lease, error) // Singleton executes a callback when the named lease is acquired. // // The lease will be released in the following circumstances: // * The callback function returns. // * The lease cannot be renewed before it expires. // * The outer context is canceled. // // If the callback returns a non-nil error, the error will be // logged. If the callback returns ErrCancelSingleton, it will not // be retried. In all other cases, the callback function is retried // once a lease is re-acquired. Singleton(ctx context.Context, name string, fn func(ctx context.Context) error) }
Leases coordinates behavior across multiple instances of Replicator.
type Memo ¶
type Memo interface { // Get retrieves the value associate to the given key. // If the value is not found, a nil slice is returned. Get(ctx context.Context, tx StagingQuerier, key string) ([]byte, error) // Put stores a value associated to the key. Put(ctx context.Context, tx StagingQuerier, key string, value []byte) error }
A Memo is a key store that persists a value associated to a key
type MultiAcceptor ¶
type MultiAcceptor interface { TemporalAcceptor // AcceptMultiBatch processes the batch. The options may be nil. AcceptMultiBatch(context.Context, *MultiBatch, *AcceptOptions) error }
A MultiAcceptor operates on a MultiBatch to achieve some effect within the target.
func CountingAcceptor ¶
func CountingAcceptor( delegate MultiAcceptor, errors Counter, received Counter, successes Counter, ) MultiAcceptor
CountingAcceptor instantiates a new acceptor that records top-of-funnel mutations counts, classifies them based on success or failure, and then delegates to another acceptor.
func OrderedAcceptorFrom ¶
func OrderedAcceptorFrom(acc TableAcceptor, watchers Watchers) MultiAcceptor
OrderedAcceptorFrom will return an adaptor which iterates over tables in the table-dependency order. The returned acceptor will respect the [AcceptOptions.TableOrder] when iterating within a TemporalBatch.
type MultiBatch ¶
type MultiBatch struct { ByTime map[hlc.Time]*TemporalBatch // Time-based indexing. Data []*TemporalBatch // Time-ordered indexed. // contains filtered or unexported fields }
A MultiBatch is a time-ordered collection of per-table data to apply. This represents the broadest scope of applying data, as it covers both time- and table-space and likely represents any number of source database transactions.
A well-formed MultiBatch will have its Data field sorted by time. MultiBatch implements sort.Interface to make this simple.
func (*MultiBatch) Accumulate ¶
func (b *MultiBatch) Accumulate(table ident.Table, mut Mutation) error
Accumulate adds a mutation to the MultiBatch while maintaining the order invariants.
func (*MultiBatch) Copy ¶
func (b *MultiBatch) Copy() *MultiBatch
Copy returns a deep copy of the MultiBatch.
func (*MultiBatch) CopyInto ¶
func (b *MultiBatch) CopyInto(acc Accumulator) error
CopyInto copies the batch into the Accumulator. The data will be ordered by time, table, and key.
func (*MultiBatch) Count ¶
func (b *MultiBatch) Count() int
Count returns the number of enclosed mutations.
func (*MultiBatch) Empty ¶
func (b *MultiBatch) Empty() *MultiBatch
Empty returns an empty MultiBatch.
type Mutation ¶
type Mutation struct { Before json.RawMessage // Optional encoded JSON object Data json.RawMessage // An encoded JSON object: { "key" : "hello" } Key json.RawMessage // An encoded JSON array: [ "hello" ] Meta map[string]any // Dialect-specific data, may be nil, not persisted Time hlc.Time // The effective time of the mutation }
A Mutation describes a row to upsert into the target database. That is, it is a collection of column values to apply to a row in some table.
type PoolInfo ¶
type PoolInfo struct { ConnectionString string Product Product Version string // ErrCode returns a database error code, if err is of the pool's // underlying driver error type. ErrCode func(err error) (string, bool) `json:"-"` // HintNoFTS decorates the table name to prevent CockroachDB from // generating an UPSERT (or other) plan that may involve a full // table scan. For other databases or versions of CockroachDB that // do not support this hint, this function returns an unhinted table // name. // // https://www.cockroachlabs.com/docs/stable/table-expressions#prevent-full-scan // https://github.com/cockroachdb/cockroach/issues/98211 HintNoFTS func(table ident.Table) *ident.Hinted[ident.Table] `json:"-"` // IsDeferrable returns true if the error might clear if the work is // tried at a future point in time (e.g.: foreign key dependencies). IsDeferrable func(err error) bool `json:"-"` // ShouldRetry returns true if the error should be retried // immediately (e.g.: serializable isolation failure). ShouldRetry func(err error) bool `json:"-"` }
PoolInfo describes a database connection pool and what it's connected to.
type Product ¶
type Product int
Product is an enum type to make it easy to switch on the underlying database.
const ( ProductUnknown Product = iota ProductCockroachDB ProductMariaDB ProductMySQL ProductOracle ProductPostgreSQL )
These are various product types that we support.
func (Product) ExpandSchema ¶
ExpandSchema validates a Schema against the expected form used by the database. This method may return an alternate value to add missing default namespace elements.
type SchemaData ¶
type SchemaData struct { Columns *ident.TableMap[[]ColData] // Order is a two-level slice that represents equivalency-groups // with respect to table foreign-key ordering. That is, if all // updates for tables in Order[N] are applied, then updates in // Order[N+1] can then be applied. // // The need for this data can be revisited if CRDB adds support // for deferrable foreign-key constraints: // https://github.com/cockroachdb/cockroach/issues/31632 Order [][]ident.Table }
SchemaData holds SQL schema metadata.
func (*SchemaData) OriginalName ¶
OriginalName returns the name of the table as it is defined in the underlying database.
type SourcePool ¶
SourcePool is an injection point for a connection to a source database.
type Stager ¶
type Stager interface { // CheckConsistency scans the staging table to ensure that it // internally consistent. The number of inconsistent staging table // entries will be returned. CheckConsistency(ctx context.Context, db StagingQuerier, muts []Mutation, followerRead bool) (int, error) // MarkApplied will mark the given mutations as having been applied. // This is used with lease-based unstaging or when certain mutation // should be skipped. MarkApplied(ctx context.Context, db StagingQuerier, muts []Mutation) error // Retire will delete staged mutations whose timestamp is less than // or equal to the given end time. Note that this call may take an // arbitrarily long amount of time to complete and its effects may // not occur within a single database transaction. Retire(ctx context.Context, db StagingQuerier, end hlc.Time) error // Stage writes the mutations into the staging table. This method is // idempotent. Stage(ctx context.Context, db StagingQuerier, muts []Mutation) error // StageIfExists will stage a mutation only if there is already a // mutation staged for its key. It returns a filtered copy of the // mutations that were not staged. This method is used to implement // the non-transactional mode, where we try to apply a mutation to // some key if there isn't already a mutation queued for that key. StageIfExists(ctx context.Context, db StagingQuerier, muts []Mutation) ([]Mutation, error) }
Stager describes a service which can durably persist some number of Mutations.
type Stagers ¶
type Stagers interface { Get(ctx context.Context, target ident.Table) (Stager, error) // Read provides access to joined staging data. Because this can // be a potentially expensive or otherwise unbounded amount of data, // the results are provided via a channel which may be incrementally // consumed from buffered data. Results will be provided in temporal // order. // // Any errors encountered while reading will be returned in the // final message before closing the channel. // // Care should be taken to [stopper.Context.Stop] the context passed // into this method to prevent goroutine or database leaks. When the // context is gracefully stopped, the channel will be closed // normally. Read(ctx *stopper.Context, q *StagingQuery) (<-chan *StagingCursor, error) }
Stagers is a factory for Stager instances.
type StagingCursor ¶
type StagingCursor struct { // A batch of data, corresponding to a transaction in the source // database. This may be nil for a progress-only update. Batch *TemporalBatch // This field will be populated if the reader encounters an // unrecoverable error while processing. A result containing an // error will be the final message in the channel before it is // closed. Error error // Fragment will be set if the Batch is not guaranteed to contain // all data for its given timestamp. This will occur, for example, // if the number of mutations for the timestamp exceeds // [StagingQuery.FragmentSize] or if an underlying database query is // not guaranteed to have yet read all values at the the batch's // time (e.g. scan boundary alignment). Consumers that require // transactionally-consistent views of the data should wait for the // next, non-fragmented, cursor update. Fragment bool // Jump indicates that the scanning bounds changed such that the // data in the stream may be disjoint. Jump bool // Progress indicates the range of data which has been successfully // scanned so far. Receivers may encounter progress-only updates // which happen when the end of the scanned bounds have been reached // or if there is a "pipeline bubble" when reading data from the // staging tables. Progress hlc.Range }
StagingCursor is emitted by [Stagers.Read].
func (*StagingCursor) String ¶
func (c *StagingCursor) String() string
String is for debugging use only.
type StagingPool ¶
StagingPool is an injection point for a connection to the staging database.
type StagingQuerier ¶
type StagingQuerier interface { Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) pgx.Row }
StagingQuerier is implemented by pgxpool.Pool, pgxpool.Conn, pgxpool.Tx, pgx.Conn, and pgx.Tx types. This allows a degree of flexibility in defining types that require a database connection.
type StagingQuery ¶
type StagingQuery struct { // The bounds variable governs the reader to ensure that it does not // read beyond the data known to be consistent, by pausing reads // when the maximum time has been reached. Updates to the minimum // bound allows records to be skipped, but a query cannot be rewound // such that it will re-emit records. Bounds *notify.Var[hlc.Range] // FragmentSize places an upper bound on the size of any individual // cursor entry. If a batch has exceeded the requested segment size, // the [StagingCursor.Fragment] flag will be set. FragmentSize int // The tables to query. Group *TableGroup }
StagingQuery is passed to [Stagers.Read].
type TableAcceptor ¶
type TableAcceptor interface { // AcceptTableBatch processes the batch. The options may be nil. AcceptTableBatch(context.Context, *TableBatch, *AcceptOptions) error }
A TableAcceptor operates on a TableBatch.
type TableBatch ¶
type TableBatch struct { Data []Mutation Table ident.Table Time hlc.Time // contains filtered or unexported fields }
A TableBatch contains mutations destined for a single table at a single timestamp. This likely corresponds to some part of a larger transaction.
func (*TableBatch) Accumulate ¶
func (b *TableBatch) Accumulate(table ident.Table, mut Mutation) error
Accumulate a mutation.
func (*TableBatch) Copy ¶
func (b *TableBatch) Copy() *TableBatch
Copy returns a deep copy of the batch.
func (*TableBatch) CopyInto ¶
func (b *TableBatch) CopyInto(acc Accumulator) error
CopyInto copies the batch into the Accumulator ordered by key.
func (*TableBatch) Count ¶
func (b *TableBatch) Count() int
Count returns the number of enclosed mutations.
func (*TableBatch) Empty ¶
func (b *TableBatch) Empty() *TableBatch
Empty returns a TableBatch with the original metadata, but no data.
type TableGroup ¶
type TableGroup struct { // A (globally-unique) name for the group that is used for // mutual-exclusion. It is often, but not required to be, the // enclosing database schema for the table. Name ident.Ident // Optional. If present, this field indicates that the group // comprises the entire schema, implying known and unknown tables. Enclosing ident.Schema // The tables which comprise the group. Tables []ident.Table }
A TableGroup is a named collection of tables. This is properly called a "schema", but that noun is overloaded.
func (*TableGroup) Copy ¶
func (g *TableGroup) Copy() *TableGroup
Copy returns a deep copy of the TableGroup.
type TargetPool ¶
TargetPool is an injection point for a connection to the target database.
type TargetQuerier ¶
type TargetQuerier interface { ExecContext(ctx context.Context, sql string, arguments ...interface{}) (sql.Result, error) QueryContext(ctx context.Context, sql string, optionsAndArgs ...interface{}) (*sql.Rows, error) QueryRowContext(ctx context.Context, sql string, optionsAndArgs ...interface{}) *sql.Row }
type TargetStatements ¶
TargetStatements is an injection point for a cache of prepared statements associated with the TargetPool.
type TargetTx ¶
type TargetTx interface { TargetQuerier Commit() error Rollback() error }
TargetTx is implemented by sql.Tx.
type TemporalAcceptor ¶
type TemporalAcceptor interface { TableAcceptor // AcceptTemporalBatch processes the batch. The options may be nil. AcceptTemporalBatch(context.Context, *TemporalBatch, *AcceptOptions) error }
A TemporalAcceptor operates on a batch of data that has a single timestamp (i.e. a source database transaction).
type TemporalBatch ¶
type TemporalBatch struct { Time hlc.Time Data ident.TableMap[*TableBatch] // contains filtered or unexported fields }
A TemporalBatch holds mutations for some number of tables that all occur at the same time. This likely corresponds to a single source transaction.
func (*TemporalBatch) Accumulate ¶
func (b *TemporalBatch) Accumulate(table ident.Table, mut Mutation) error
Accumulate adds a mutation for the given table to the batch.
func (*TemporalBatch) Copy ¶
func (b *TemporalBatch) Copy() *TemporalBatch
Copy returns a deep copy of the TemporalBatch.
func (*TemporalBatch) CopyInto ¶
func (b *TemporalBatch) CopyInto(acc Accumulator) error
CopyInto copies the batch into the Accumulator. The data will be sorted by table name and then by key.
func (*TemporalBatch) Count ¶
func (b *TemporalBatch) Count() int
Count returns the number of enclosed mutations.
func (*TemporalBatch) Empty ¶
func (b *TemporalBatch) Empty() *TemporalBatch
Empty returns an empty TemporalBatch at the same time.
type Watcher ¶
type Watcher interface { // Get returns a snapshot of all tables in the target database. // The returned struct must not be modified. Get() *SchemaData // Refresh will force the Watcher to immediately query the database // for updated schema information. This is intended for testing and // does not need to be called in the general case. Refresh(context.Context, *TargetPool) error // Watch returns a channel that emits updated column data for the // given table. The channel will be closed if there Watch(table ident.Table) (_ <-chan []ColData, cancel func(), _ error) }
Watcher allows table metadata to be observed.
The methods in this type return column data such that primary key columns are returned first, in their declaration order, followed by all other non-pk columns.