Documentation ¶
Index ¶
- Constants
- func RunTxnTests(fn func(TxnClient, rpc.TxnSender), opts ...TxnClientCreateOption)
- func SetupRuntimeTxnOptions(rt runtime.Runtime, m txn.TxnMode, iso txn.TxnIsolation)
- type EventType
- type Lock
- type TimestampWaiter
- type TxnClient
- type TxnClientCreateOption
- func WithEnableCNBasedConsistency() TxnClientCreateOption
- func WithEnableLeakCheck(maxActiveAges time.Duration, ...) TxnClientCreateOption
- func WithEnableRefreshExpression() TxnClientCreateOption
- func WithEnableSacrificingFreshness() TxnClientCreateOption
- func WithLockService(lockService lockservice.LockService) TxnClientCreateOption
- func WithMaxActiveTxn(n int) TxnClientCreateOption
- func WithTimestampWaiter(waiter TimestampWaiter) TxnClientCreateOption
- func WithTxnIDGenerator(generator TxnIDGenerator) TxnClientCreateOption
- func WithTxnLimit(n int) TxnClientCreateOption
- type TxnIDGenerator
- type TxnOperator
- type TxnOption
- func WithSnapshotTS(ts timestamp.Timestamp) TxnOption
- func WithTxnCNCoordinator() TxnOption
- func WithTxnCacheWrite() TxnOption
- func WithTxnCreateBy(createBy string) TxnOption
- func WithTxnDisable1PCOpt() TxnOption
- func WithTxnIsolation(value txn.TxnIsolation) TxnOption
- func WithTxnLockService(lockService lockservice.LockService) TxnOption
- func WithTxnMode(value txn.TxnMode) TxnOption
- func WithTxnReadyOnly() TxnOption
- func WithUserTxn() TxnOption
- type TxnOverview
- type Workspace
Constants ¶
const ( // ClosedEvent txn closed event ClosedEvent = EventType(0) )
Variables ¶
This section is empty.
Functions ¶
func RunTxnTests ¶ added in v0.8.0
func RunTxnTests(fn func(TxnClient, rpc.TxnSender), opts ...TxnClientCreateOption)
RunTxnTests runs txn tests.
func SetupRuntimeTxnOptions ¶ added in v0.8.0
SetupRuntimeTxnOptions setup runtime based txn options
Types ¶
type Lock ¶ added in v1.0.0
type Lock struct { // TableID table id TableID uint64 // Rows lock rows. If granularity is row, rows contains all point lock keys, otherwise rows // is range1start, range1end, range2start, range2end, ... Rows [][]byte // Options lock options, include lock type(row|range) and lock mode Options lock.LockOptions }
Lock wait locks
type TimestampWaiter ¶ added in v0.8.0
type TimestampWaiter interface { // GetTimestamp get the latest commit ts as snapshot ts of the new txn. It will keep // blocking if latest commit timestamp received from TN is less than the given value. GetTimestamp(context.Context, timestamp.Timestamp) (timestamp.Timestamp, error) // NotifyLatestCommitTS notify the latest timestamp that received from DN. A applied logtail // commit ts is corresponds to an epoch. Whenever the connection of logtail of cn and tn is // reset, the epoch will be reset and all the ts of the old epoch should be invalidated. NotifyLatestCommitTS(appliedTS timestamp.Timestamp) // Pause pauses the timestamp waiter and cancel all waiters in timestamp waiter. // They will not wait for the newer timestamp anymore. Pause() // Resume resumes the cancel channel in the timestamp waiter after all transactions are // aborted. Resume() // CancelC returns the cancel channel of timestamp waiter. If it is nil, means that // the logtail consumer is reconnecting to logtail server and is aborting all transaction. // At this time, we cannot open new transactions. CancelC() chan struct{} // Close close the timestamp waiter Close() }
TimestampWaiter is used to wait for the timestamp to reach a specified timestamp. In the Push mode of LogTail's Event, the TN pushes the logtail to the subscribed CN once a transaction has been Committed. So there is a actual wait (last push commit ts >= start ts). This is unfriendly to TP, so we can lose some freshness and use the latest commit ts received from the current TN push as the start ts of the transaction, which eliminates this physical wait.
func NewTimestampWaiter ¶ added in v0.8.0
func NewTimestampWaiter() TimestampWaiter
NewTimestampWaiter create timestamp waiter
type TxnClient ¶
type TxnClient interface { // Minimum Active Transaction Timestamp MinTimestamp() timestamp.Timestamp // New returns a TxnOperator to handle read and write operation for a // transaction. New(ctx context.Context, commitTS timestamp.Timestamp, options ...TxnOption) (TxnOperator, error) // NewWithSnapshot create a txn operator from a snapshot. The snapshot must // be from a CN coordinator txn operator. NewWithSnapshot(snapshot []byte) (TxnOperator, error) // AbortAllRunningTxn rollback all running transactions. but still keep their workspace to avoid panic. AbortAllRunningTxn() // Close closes client.sender Close() error // WaitLogTailAppliedAt wait log tail applied at ts WaitLogTailAppliedAt(ctx context.Context, ts timestamp.Timestamp) (timestamp.Timestamp, error) // GetLatestCommitTS get latest commit timestamp GetLatestCommitTS() timestamp.Timestamp // SyncLatestCommitTS sync latest commit timestamp SyncLatestCommitTS(timestamp.Timestamp) // GetSyncLatestCommitTSTimes returns times of sync latest commit ts GetSyncLatestCommitTSTimes() uint64 // Pause the txn client to prevent new txn from being created. Pause() // Resume the txn client to allow new txn to be created. Resume() // RefreshExpressionEnabled return true if refresh expression feature enabled RefreshExpressionEnabled() bool // CNBasedConsistencyEnabled return true if cn based consistency feature enabled CNBasedConsistencyEnabled() bool // IterTxns iter all txns IterTxns(func(TxnOverview) bool) }
TxnClient transaction client, the operational entry point for transactions. Each CN node holds one instance of TxnClient.
func NewTxnClient ¶
func NewTxnClient( sender rpc.TxnSender, options ...TxnClientCreateOption) TxnClient
NewTxnClient create a txn client with TxnSender and Options
type TxnClientCreateOption ¶
type TxnClientCreateOption func(*txnClient)
TxnClientCreateOption options for create txn
func WithEnableCNBasedConsistency ¶ added in v0.8.0
func WithEnableCNBasedConsistency() TxnClientCreateOption
WithEnableCNBasedConsistency let all transactions on a CN see writes committed by other transactions before them. When this feature is enabled, the client maintains a CN-Based commit timestamp, and when opening a new transaction, it adjusts the transaction's snapshot timestamp to at least >= lastCommitTimestamp, so that it can see the writes of the previously committed transaction
func WithEnableLeakCheck ¶ added in v0.8.0
func WithEnableLeakCheck( maxActiveAges time.Duration, leakHandleFunc func(txnID []byte, createAt time.Time, createBy string, counter string, lastSql string)) TxnClientCreateOption
WithEnableLeakCheck enable txn leak check. Used to found any txn is not committed or rolled back.
func WithEnableRefreshExpression ¶ added in v0.8.0
func WithEnableRefreshExpression() TxnClientCreateOption
WithEnableRefreshExpression in RC mode, in the event of a conflict, the later transaction needs to see the latest data after the previous transaction commits. At this time we need to re-read the data, re-read the latest data, and re-compute the expression.
func WithEnableSacrificingFreshness ¶ added in v0.8.0
func WithEnableSacrificingFreshness() TxnClientCreateOption
When making this optimization, there are some scenarios where data consistency must be guaranteed, such as a database connection in a session where the latter transaction must be able to read the data committed by the previous transaction, then it is necessary to maintain a Session-level transaction last commit time, and the start time of the next transaction cannot be less than this value.
If we need to ensure that all the transactions on a CN can read the writes of the previous committed transaction, then we can use WithEnableCNBasedConsistency to turn on.
func WithLockService ¶ added in v0.8.0
func WithLockService(lockService lockservice.LockService) TxnClientCreateOption
WithLockService setup lock service
func WithMaxActiveTxn ¶ added in v1.0.0
func WithMaxActiveTxn(n int) TxnClientCreateOption
WithMaxActiveTxn is the count of max active txn in current cn. If reached max value, the txn is added to a FIFO queue. Default is unlimited.
func WithTimestampWaiter ¶ added in v0.8.0
func WithTimestampWaiter(waiter TimestampWaiter) TxnClientCreateOption
WithTimestampWaiter setup timestamp waiter to get the latest applied committed timestamp from logtail.
func WithTxnIDGenerator ¶
func WithTxnIDGenerator(generator TxnIDGenerator) TxnClientCreateOption
WithTxnIDGenerator setup txn id generator
func WithTxnLimit ¶ added in v1.0.0
func WithTxnLimit(n int) TxnClientCreateOption
WithTxnLimit flow control of transaction creation, maximum number of transactions per second
type TxnIDGenerator ¶
type TxnIDGenerator interface { // Generate returns a unique transaction id Generate() []byte }
TxnIDGenerator txn id generator
type TxnOperator ¶
type TxnOperator interface { // GetOverview returns txn overview GetOverview() TxnOverview // Txn returns the current txn metadata Txn() txn.TxnMeta // TxnRef returns pointer of current txn metadata. In RC mode, txn's snapshot ts // will updated before statement executed. TxnRef() *txn.TxnMeta // Snapshot a snapshot of the transaction handle that can be passed around the // network. In some scenarios, operations of a transaction are executed on multiple // CN nodes for performance acceleration. But with only one CN coordinator, Snapshot // can be used to recover the transaction operation handle at a non-CN coordinator // node, or it can be used to pass information back to the transaction coordinator // after the non-CN coordinator completes the transaction operation. Snapshot() ([]byte, error) // UpdateSnapshot in some scenarios, we need to boost the snapshotTimestamp to eliminate // the w-w conflict. // If ts is empty, it will use the latest commit timestamp which is received from DN. UpdateSnapshot(ctx context.Context, ts timestamp.Timestamp) error // SnapshotTS returns the snapshot timestamp of the transaction. SnapshotTS() timestamp.Timestamp // Status returns the current transaction status. Status() txn.TxnStatus // ApplySnapshot CN coordinator applies a snapshot of the non-coordinator's transaction // operation information. ApplySnapshot(data []byte) error // Read transaction read operation, the operator routes the message based // on the given TN node information and waits for the read data synchronously. // The transaction has been aborted if ErrTxnAborted returned. // After use, SendResult needs to call the Release method Read(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) // Write transaction write operation, and the operator will record the DN // nodes written by the current transaction, and when it finds that multiple // TN nodes are written, it will start distributed transaction processing. // The transaction has been aborted if ErrTxnAborted returned. // After use, SendResult needs to call the Release method Write(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) // WriteAndCommit is similar to Write, but commit the transaction after write. // After use, SendResult needs to call the Release method WriteAndCommit(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) // Commit the transaction. If data has been written to multiple TN nodes, a // 2pc distributed transaction commit process is used. Commit(ctx context.Context) error // Rollback the transaction. Rollback(ctx context.Context) error // AddLockTable for pessimistic transactions, if the current transaction is successfully // locked, the metadata corresponding to the lockservice needs to be recorded to the txn, and // at transaction commit time, the metadata of all lock services accessed by the transaction // will be committed to tn to check. If the metadata of the lockservice changes in [lock, commit], // the transaction will be rolled back. AddLockTable(locktable lock.LockTable) error // AddWaitLock add wait lock for current txn AddWaitLock(tableID uint64, rows [][]byte, opt lock.LockOptions) uint64 // RemoveWaitLock remove wait lock for current txn RemoveWaitLock(key uint64) // AddWorkspace for the transaction AddWorkspace(workspace Workspace) // GetWorkspace from the transaction GetWorkspace() Workspace ResetRetry(bool) IsRetry() bool // AppendEventCallback append callback. All append callbacks will be called sequentially // if event happen. AppendEventCallback(event EventType, callbacks ...func(txn.TxnMeta)) // Debug send debug request to DN, after use, SendResult needs to call the Release // method. Debug(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) // SetLastSql sets the last sql using the txn SetLastSql(sql string) }
TxnOperator operator for transaction clients, handling read and write requests for transactions, and handling distributed transactions across DN nodes. Note: For Error returned by Read/Write/WriteAndCommit/Commit/Rollback, need to check if it is a moerr.ErrDNShardNotFound error, if so, the TN information held is out of date and needs to be reloaded by HAKeeper.
type TxnOption ¶
type TxnOption func(*txnOperator)
TxnOption options for setup transaction FIXME(fagongzi): refactor TxnOption to avoid mem alloc
func WithSnapshotTS ¶ added in v0.7.0
WithSnapshotTS use a spec snapshot timestamp to build TxnOperator.
func WithTxnCNCoordinator ¶
func WithTxnCNCoordinator() TxnOption
WithTxnCNCoordinator set cn txn coordinator
func WithTxnCacheWrite ¶
func WithTxnCacheWrite() TxnOption
WithTxnCacheWrite Set cache write requests, after each Write call, the request will not be sent to the TN node immediately, but stored in the Coordinator's memory, and the Coordinator will choose the right time to send the cached requests. The following scenarios trigger the sending of requests to DN:
- Before read, because the Coordinator is not aware of the format and content of the written data, it is necessary to send the cached write requests to the corresponding TN node each time Read is called, used to implement "read your write".
- Before commit, obviously, the cached write requests needs to be sent to the corresponding TN node before commit.
func WithTxnCreateBy ¶ added in v0.8.0
WithTxnCreateBy set txn create by.Used to check leak txn
func WithTxnDisable1PCOpt ¶
func WithTxnDisable1PCOpt() TxnOption
WithTxnDisable1PCOpt disable 1pc opt on distributed transaction. By default, mo enables 1pc optimization for distributed transactions. For write operations, if all partitions' prepares are executed successfully, then the transaction is considered committed and returned directly to the client. Partitions' prepared data are committed asynchronously.
func WithTxnIsolation ¶ added in v0.8.0
func WithTxnIsolation(value txn.TxnIsolation) TxnOption
WithTxnIsolation set txn isolation
func WithTxnLockService ¶ added in v0.8.0
func WithTxnLockService(lockService lockservice.LockService) TxnOption
WithTxnLockService set txn lock service
func WithTxnMode ¶ added in v0.8.0
WithTxnMode set txn mode
func WithUserTxn ¶ added in v1.0.0
func WithUserTxn() TxnOption
WithUserTxn setup user transaction flag. Only user transactions need to be controlled for the maximum number of active transactions.
type TxnOverview ¶ added in v1.0.0
type TxnOverview struct { // CreateAt create at CreateAt time.Time // Meta txn metadata Meta txn.TxnMeta // UserTxn true if is a user transaction UserTxn bool // WaitLocks wait locks WaitLocks []Lock }
TxnOverview txn overview include meta and status
type Workspace ¶ added in v0.8.0
type Workspace interface { // StartStatement tag a statement is running StartStatement() // EndStatement tag end a statement is completed EndStatement() // IncrStatementID incr the execute statement id. It maintains the statement id, first statement is 1, // second is 2, and so on. If in rc mode, snapshot will updated to latest applied commit ts from dn. And // workspace will update snapshot data for later read request. IncrStatementID(ctx context.Context, commit bool) error // RollbackLastStatement rollback the last statement. RollbackLastStatement(ctx context.Context) error // Adjust adjust workspace, adjust update's delete+insert to correct order and merge workspace. Adjust() error Commit(ctx context.Context) ([]txn.TxnRequest, error) Rollback(ctx context.Context) error IncrSQLCount() GetSQLCount() uint64 }