Documentation ¶
Index ¶
- Constants
- func RunTxnTests(fn func(TxnClient, rpc.TxnSender), opts ...TxnClientCreateOption)
- func SetupRuntimeTxnOptions(rt runtime.Runtime, m txn.TxnMode, iso txn.TxnIsolation)
- type DebugableTxnOperator
- type EventType
- type EventableTxnOperator
- type TimestampWaiter
- type TxnClient
- type TxnClientCreateOption
- func WithEnableCNBasedConsistency() TxnClientCreateOption
- func WithEnableLeakCheck(maxActiveAges time.Duration, ...) TxnClientCreateOption
- func WithEnableRefreshExpression() TxnClientCreateOption
- func WithEnableSacrificingFreshness() TxnClientCreateOption
- func WithLockService(lockService lockservice.LockService) TxnClientCreateOption
- func WithTimestampWaiter(waiter TimestampWaiter) TxnClientCreateOption
- func WithTxnIDGenerator(generator TxnIDGenerator) TxnClientCreateOption
- type TxnClientWithCtl
- type TxnClientWithFeature
- type TxnIDGenerator
- type TxnOperator
- type TxnOption
- func WithSnapshotTS(ts timestamp.Timestamp) TxnOption
- func WithTxnCNCoordinator() TxnOption
- func WithTxnCacheWrite() TxnOption
- func WithTxnCreateBy(createBy string) TxnOption
- func WithTxnDisable1PCOpt() TxnOption
- func WithTxnIsolation(value txn.TxnIsolation) TxnOption
- func WithTxnLockService(lockService lockservice.LockService) TxnOption
- func WithTxnMode(value txn.TxnMode) TxnOption
- func WithTxnReadyOnly() TxnOption
- type Workspace
Constants ¶
const ( // ClosedEvent txn closed event ClosedEvent = EventType(0) )
Variables ¶
This section is empty.
Functions ¶
func RunTxnTests ¶ added in v0.8.0
func RunTxnTests(fn func(TxnClient, rpc.TxnSender), opts ...TxnClientCreateOption)
RunTxnTests runs txn tests.
func SetupRuntimeTxnOptions ¶ added in v0.8.0
SetupRuntimeTxnOptions setup runtime based txn options
Types ¶
type DebugableTxnOperator ¶ added in v0.6.0
type DebugableTxnOperator interface { TxnOperator // Debug send debug request to DN, after use, SendResult needs to call the Release // method. Debug(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) }
DebugableTxnOperator debugable txn operator
type EventableTxnOperator ¶ added in v0.8.0
type EventableTxnOperator interface { TxnOperator // AppendEventCallback append callback. All append callbacks will be called sequentially // if event happend. AppendEventCallback(event EventType, callbacks ...func(txn.TxnMeta)) }
CallbackTxnOperator callback txn operator
type TimestampWaiter ¶ added in v0.8.0
type TimestampWaiter interface { // GetTimestamp get the latest commit ts as snapshot ts of the new txn. It will keep // blocking if latest commit timestamp received from DN is less than the given value. GetTimestamp(context.Context, timestamp.Timestamp) (timestamp.Timestamp, error) // NotifyLatestCommitTS notify the latest timestamp that received from DN. A applied logtail // commit ts is corresponds to an epoch. Whenever the connection of logtail of cn and dn is // reset, the epoch will be reset and all the ts of the old epoch should be invalidated. NotifyLatestCommitTS(appliedTS timestamp.Timestamp) // Close close the timestamp waiter Close() }
TimestampWaiter is used to wait for the timestamp to reach a specified timestamp. In the Push mode of LogTail's Event, the DN pushes the logtail to the subscribed CN once a transaction has been Committed. So there is a actual wait (last push commit ts >= start ts). This is unfriendly to TP, so we can lose some freshness and use the latest commit ts received from the current DN push as the start ts of the transaction, which eliminates this physical wait.
func NewTimestampWaiter ¶ added in v0.8.0
func NewTimestampWaiter() TimestampWaiter
NewTimestampWaiter create timestamp waiter
type TxnClient ¶
type TxnClient interface { // Minimum Active Transaction Timestamp MinTimestamp() timestamp.Timestamp // New returns a TxnOperator to handle read and write operation for a // transaction. New(ctx context.Context, commitTS timestamp.Timestamp, options ...TxnOption) (TxnOperator, error) // NewWithSnapshot create a txn operator from a snapshot. The snapshot must // be from a CN coordinator txn operator. NewWithSnapshot(snapshot []byte) (TxnOperator, error) // AbortAllRunningTxn set all running txn to be aborted. AbortAllRunningTxn() // Close closes client.sender Close() error // WaitLogTailAppliedAt wait log tail applied at ts WaitLogTailAppliedAt(ctx context.Context, ts timestamp.Timestamp) (timestamp.Timestamp, error) }
TxnClient transaction client, the operational entry point for transactions. Each CN node holds one instance of TxnClient.
func NewTxnClient ¶
func NewTxnClient( sender rpc.TxnSender, options ...TxnClientCreateOption) TxnClient
NewTxnClient create a txn client with TxnSender and Options
type TxnClientCreateOption ¶
type TxnClientCreateOption func(*txnClient)
TxnClientCreateOption options for create txn
func WithEnableCNBasedConsistency ¶ added in v0.8.0
func WithEnableCNBasedConsistency() TxnClientCreateOption
WithEnableCNBasedConsistency let all transactions on a CN see writes committed by other transactions before them. When this feature is enabled, the client maintains a CN-Based commit timestamp, and when opening a new transaction, it adjusts the transaction's snapshot timestamp to at least >= lastCommitTimestamp, so that it can see the writes of the previously committed transaction
func WithEnableLeakCheck ¶ added in v0.8.0
func WithEnableLeakCheck( maxActiveAges time.Duration, leakHandleFunc func(txnID []byte, createAt time.Time, createBy string)) TxnClientCreateOption
WithEnableLeakCheck enable txn leak check. Used to found any txn is not committed or rollbacked.
func WithEnableRefreshExpression ¶ added in v0.8.0
func WithEnableRefreshExpression() TxnClientCreateOption
WithEnableRefreshExpression in RC mode, in the event of a conflict, the later transaction needs to see the latest data after the previous transaction commits. At this time we need to re-read the data, re-read the latest data, and re-compute the expression.
func WithEnableSacrificingFreshness ¶ added in v0.8.0
func WithEnableSacrificingFreshness() TxnClientCreateOption
When making this optimization, there are some scenarios where data consistency must be guaranteed, such as a database connection in a session where the latter transaction must be able to read the data committed by the previous transaction, then it is necessary to maintain a Session-level transaction last commit time, and the start time of the next transaction cannot be less than this value.
If we need to ensure that all the transactions on a CN can read the writes of the previous committed transaction, then we can use WithEenableCNBasedConsistency to turn on.
func WithLockService ¶ added in v0.8.0
func WithLockService(lockService lockservice.LockService) TxnClientCreateOption
WithLockService setup lock service
func WithTimestampWaiter ¶ added in v0.8.0
func WithTimestampWaiter(waiter TimestampWaiter) TxnClientCreateOption
WithTimestampWaiter setup timestamp waiter to get the latest applied committed timestamp from logtail.
func WithTxnIDGenerator ¶
func WithTxnIDGenerator(generator TxnIDGenerator) TxnClientCreateOption
WithTxnIDGenerator setup txn id generator
type TxnClientWithCtl ¶ added in v0.8.0
type TxnClientWithCtl interface { TxnClient // GetLatestCommitTS get latest commit timestamp GetLatestCommitTS() timestamp.Timestamp // SetLatestCommitTS set latest commit timestamp SetLatestCommitTS(timestamp.Timestamp) }
TxnClientWithCtl TxnClient to support ctl command.
type TxnClientWithFeature ¶ added in v0.8.0
type TxnClientWithFeature interface { TxnClient // Pause the txn client to prevent new txn from being created. Pause() // Resume the txn client to allow new txn to be created. Resume() // RefreshExpressionEnabled return true if refresh expression feature enabled RefreshExpressionEnabled() bool // CNBasedConsistencyEnabled return true if cn based consistency feature enabled CNBasedConsistencyEnabled() bool }
TxnClientWithFeature is similar to TxnClient, except that some methods have been added to determine whether certain features are supported.
type TxnIDGenerator ¶
type TxnIDGenerator interface { // Generate returns a unique transaction id Generate() []byte }
TxnIDGenerator txn id generator
type TxnOperator ¶
type TxnOperator interface { // Txn returns the current txn metadata Txn() txn.TxnMeta // TxnRef returns pointer of current txn metadata. In RC mode, txn's snapshot ts // will updated before statement executed. TxnRef() *txn.TxnMeta // Snapshot a snapshot of the transaction handle that can be passed around the // network. In some scenarios, operations of a transaction are executed on multiple // CN nodes for performance acceleration. But with only one CN coordinator, Snapshot // can be used to recover the transaction operation handle at a non-CN coordinator // node, or it can be used to pass information back to the transaction coordinator // after the non-CN coordinator completes the transaction operation. Snapshot() ([]byte, error) // UpdateSnapshot in some scenarios, we need to boost the snapshotTimestamp to eliminate // the w-w conflict. // If ts is empty, it will use the latest commit timestamp which is received from DN. UpdateSnapshot(ctx context.Context, ts timestamp.Timestamp) error // ApplySnapshot CN coordinator applies a snapshot of the non-coordinator's transaction // operation information. ApplySnapshot(data []byte) error // Read transaction read operation, the operator routes the message based // on the given DN node information and waits for the read data synchronously. // The transaction has been aborted if ErrTxnAborted returned. // After use, SendResult needs to call the Release method Read(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) // Write transaction write operation, and the operator will record the DN // nodes written by the current transaction, and when it finds that multiple // DN nodes are written, it will start distributed transaction processing. // The transaction has been aborted if ErrTxnAborted returned. // After use, SendResult needs to call the Release method Write(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) // WriteAndCommit is similar to Write, but commit the transaction after write. // After use, SendResult needs to call the Release method WriteAndCommit(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error) // Commit the transaction. If data has been written to multiple DN nodes, a // 2pc distributed transaction commit process is used. Commit(ctx context.Context) error // Rollback the transaction. Rollback(ctx context.Context) error // AddLockTable for pessimistic transactions, if the current transaction is successfully // locked, the metadata corresponding to the lockservice needs to be recorded to the txn, and // at transaction commit time, the metadata of all lockservices accessed by the transaction // will be committed to dn to check. If the metadata of the lockservice changes in [lock, commit], // the transaction will be rolled back. AddLockTable(locktable lock.LockTable) error // AddWorkspace for the transaction AddWorkspace(workspace Workspace) // GetWorkspace from the transaction GetWorkspace() Workspace }
TxnOperator operator for transaction clients, handling read and write requests for transactions, and handling distributed transactions across DN nodes. Note: For Error returned by Read/Write/WriteAndCommit/Commit/Rollback, need to check if it is a moerr.ErrDNShardNotFound error, if so, the DN information held is out of date and needs to be reloaded by HAKeeper.
type TxnOption ¶
type TxnOption func(*txnOperator)
TxnOption options for setup transaction FIXME(fagongzi): refactor TxnOption to avoid mem alloc
func WithSnapshotTS ¶ added in v0.7.0
WithSnapshotTS use a spec snapshot timestamp to build TxnOperator.
func WithTxnCNCoordinator ¶
func WithTxnCNCoordinator() TxnOption
WithTxnCNCoordinator set cn txn coordinator
func WithTxnCacheWrite ¶
func WithTxnCacheWrite() TxnOption
WithTxnCacheWrite Set cache write requests, after each Write call, the request will not be sent to the DN node immediately, but stored in the Coordinator's memory, and the Coordinator will choose the right time to send the cached requests. The following scenarios trigger the sending of requests to DN:
- Before read, because the Coordinator is not aware of the format and content of the written data, it is necessary to send the cached write requests to the corresponding DN node each time Read is called, used to implement "read your write".
- Before commit, obviously, the cached write requests needs to be sent to the corresponding DN node before commit.
func WithTxnCreateBy ¶ added in v0.8.0
WithTxnCreateBy set txn create by.Used to check leak txn
func WithTxnDisable1PCOpt ¶
func WithTxnDisable1PCOpt() TxnOption
WithTxnDisable1PCOpt disable 1pc opt on distributed transaction. By default, mo enables 1pc optimization for distributed transactions. For write operations, if all partitions' prepares are executed successfully, then the transaction is considered committed and returned directly to the client. Partitions' prepared data are committed asynchronously.
func WithTxnIsolation ¶ added in v0.8.0
func WithTxnIsolation(value txn.TxnIsolation) TxnOption
WithTxnIsolation set txn isolation
func WithTxnLockService ¶ added in v0.8.0
func WithTxnLockService(lockService lockservice.LockService) TxnOption
WithTxnLockService set txn lock service
func WithTxnMode ¶ added in v0.8.0
WithTxnMode set txn mode
type Workspace ¶ added in v0.8.0
type Workspace interface { // IncrStatementID incr the execute statement id. It maintains the statement id, first statement is 1, // second is 2, and so on. If in rc mode, snapshot will updated to latest applied commit ts from dn. And // workspace will update snapshot data for later read request. IncrStatementID(ctx context.Context, commit bool) error // RollbackLastStatement rollback the last statement. RollbackLastStatement(ctx context.Context) error Commit(ctx context.Context) error Rollback(ctx context.Context) error }