txn

package
v4.0.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2020 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package txn implements functions for examining and processing transaction oplog entries.

Index

Constants

This section is empty.

Variables

View Source
var ErrBufferClosed = errors.New("transaction buffer already closed")
View Source
var ErrNotTransaction = errors.New("oplog entry is not a transaction")
View Source
var ErrTxnAborted = errors.New("transaction aborted")

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Buffer stores transaction oplog entries until they are needed to commit them to a desination. It includes a WaitGroup for tracking all goroutines across all transactions for use in global shutdown.

func NewBuffer

func NewBuffer() *Buffer

NewBuffer initializes a transaction oplog buffer.

func (*Buffer) AddOp

func (b *Buffer) AddOp(m Meta, op db.Oplog) error

AddOp sends a transaction oplog entry to a background goroutine (starting one for a new transaction ID) for asynchronous pre-processing and storage. If the oplog entry is not a transaction, an error will be returned. Any errors during processing can be discovered later via the error channel from `GetTxnStream`.

Must not be called concurrently with other transaction-related operations. Must not be called for a given transaction after starting to stream that transaction.

func (*Buffer) GetTxnStream

func (b *Buffer) GetTxnStream(m Meta) (<-chan db.Oplog, <-chan error)

GetTxnStream returns a channel of Oplog entries in a transaction and a channel for errors. If the buffer has been stopped, the returned op channel will be closed and the error channel will have an error on it.

Must not be called concurrently with other transaction-related operations. For a given transaction, it must not be called until after a final oplog entry has been passed to AddOp and it must not be called more than once.

func (*Buffer) OldestTimestamp

func (b *Buffer) OldestTimestamp() primitive.Timestamp

OldestTimestamp returns the timestamp of the oldest buffered transaction, or a zero-value timestamp if no transactions are buffered. This will include committed transactions until they are purged.

func (*Buffer) PurgeTxn

func (b *Buffer) PurgeTxn(m Meta) error

PurgeTxn closes any transaction streams in progress and deletes all oplog entries associated with a transaction.

Must not be called concurrently with other transaction-related operations. For a given transaction, it must not be called until after a final oplog entry has been passed to AddOp and it must not be called more than once.

func (*Buffer) Stop

func (b *Buffer) Stop() error

Stop shuts down processing and cleans up. Subsequent calls to Stop() will return nil. All other methods error after this is called.

type ID

type ID struct {
	// contains filtered or unexported fields
}

ID wraps fields needed to uniquely identify a transaction for use as a map key. The 'lsid' is a string rather than bson.Raw or []byte so that this type is a valid map key.

func (ID) String

func (id ID) String() string

type Meta

type Meta struct {
	// contains filtered or unexported fields
}

Meta holds information extracted from an oplog entry for later routing logic. Zero value means 'not a transaction'. We store 'prevOpTime' as string so the struct is comparable.

func NewMeta

func NewMeta(op db.Oplog) (Meta, error)

NewMeta extracts transaction metadata from an oplog entry. A non-transaction will return a zero-value Meta struct, not an error.

Currently there is no way for this to error, but that may change in the future if we change the db.Oplog.Object to bson.Raw, so the API is designed with failure as a possibility.

func (Meta) IsAbort

func (m Meta) IsAbort() bool

IsAbort is true if the oplog entry had the abort command.

func (Meta) IsCommit

func (m Meta) IsCommit() bool

IsCommit is true if the oplog entry was an abort command or was the final entry of an unprepared transaction.

func (Meta) IsData

func (m Meta) IsData() bool

IsData is true if the oplog entry contains transaction data

func (Meta) IsFinal

func (m Meta) IsFinal() bool

IsFinal is true if the oplog entry is the closing entry of a transaction, i.e. if IsAbort or IsCommit is true.

func (Meta) IsMultiOp

func (m Meta) IsMultiOp() bool

IsMultiOp is true if the oplog entry is part of a prepared and/or large transaction.

func (Meta) IsTxn

func (m Meta) IsTxn() bool

IsTxn is true if the oplog entry is part of any transaction, i.e. the lsid field exists.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL