client

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ClosedEvent txn closed event
	ClosedEvent = EventType(0)
)

Variables

This section is empty.

Functions

func RunTxnTests added in v0.8.0

func RunTxnTests(fn func(TxnClient, rpc.TxnSender), opts ...TxnClientCreateOption)

RunTxnTests runs txn tests.

func SetupRuntimeTxnOptions added in v0.8.0

func SetupRuntimeTxnOptions(
	rt runtime.Runtime,
	m txn.TxnMode,
	iso txn.TxnIsolation)

SetupRuntimeTxnOptions setup runtime based txn options

Types

type EventType added in v0.8.0

type EventType int

TxnEvent txn events

type Lock added in v1.0.0

type Lock struct {
	// TableID table id
	TableID uint64
	// Rows lock rows. If granularity is row, rows contains all point lock keys, otherwise rows
	// is range1start, range1end, range2start, range2end, ...
	Rows [][]byte
	// Options lock options, include lock type(row|range) and lock mode
	Options lock.LockOptions
}

Lock wait locks

type TimestampWaiter added in v0.8.0

type TimestampWaiter interface {
	// GetTimestamp get the latest commit ts as snapshot ts of the new txn. It will keep
	// blocking if latest commit timestamp received from TN is less than the given value.
	GetTimestamp(context.Context, timestamp.Timestamp) (timestamp.Timestamp, error)
	// NotifyLatestCommitTS notify the latest timestamp that received from DN. A applied logtail
	// commit ts is corresponds to an epoch. Whenever the connection of logtail of cn and tn is
	// reset, the epoch will be reset and all the ts of the old epoch should be invalidated.
	NotifyLatestCommitTS(appliedTS timestamp.Timestamp)
	// Pause pauses the timestamp waiter and cancel all waiters in timestamp waiter.
	// They will not wait for the newer timestamp anymore.
	Pause()
	// Resume resumes the cancel channel in the timestamp waiter after all transactions are
	// aborted.
	Resume()
	// CancelC returns the cancel channel of timestamp waiter. If it is nil, means that
	// the logtail consumer is reconnecting to logtail server and is aborting all transaction.
	// At this time, we cannot open new transactions.
	CancelC() chan struct{}
	// Close close the timestamp waiter
	Close()
	// LatestTS returns the latest timestamp of the waiter.
	LatestTS() timestamp.Timestamp
}

TimestampWaiter is used to wait for the timestamp to reach a specified timestamp. In the Push mode of LogTail's Event, the TN pushes the logtail to the subscribed CN once a transaction has been Committed. So there is a actual wait (last push commit ts >= start ts). This is unfriendly to TP, so we can lose some freshness and use the latest commit ts received from the current TN push as the start ts of the transaction, which eliminates this physical wait.

func NewTimestampWaiter added in v0.8.0

func NewTimestampWaiter() TimestampWaiter

NewTimestampWaiter create timestamp waiter

type TxnClient

type TxnClient interface {
	// Minimum Active Transaction Timestamp
	MinTimestamp() timestamp.Timestamp
	// New returns a TxnOperator to handle read and write operation for a
	// transaction.
	New(ctx context.Context, commitTS timestamp.Timestamp, options ...TxnOption) (TxnOperator, error)
	// NewWithSnapshot create a txn operator from a snapshot. The snapshot must
	// be from a CN coordinator txn operator.
	NewWithSnapshot(snapshot []byte) (TxnOperator, error)
	// AbortAllRunningTxn rollback all running transactions. but still keep their workspace to avoid panic.
	AbortAllRunningTxn()
	// Close closes client.sender
	Close() error
	// WaitLogTailAppliedAt wait log tail applied at ts
	WaitLogTailAppliedAt(ctx context.Context, ts timestamp.Timestamp) (timestamp.Timestamp, error)
	// GetLatestCommitTS get latest commit timestamp
	GetLatestCommitTS() timestamp.Timestamp
	// SyncLatestCommitTS sync latest commit timestamp
	SyncLatestCommitTS(timestamp.Timestamp)
	// GetSyncLatestCommitTSTimes returns times of sync latest commit ts
	GetSyncLatestCommitTSTimes() uint64
	// Pause the txn client to prevent new txn from being created.
	Pause()
	// Resume the txn client to allow new txn to be created.
	Resume()
	// RefreshExpressionEnabled return true if refresh expression feature enabled
	RefreshExpressionEnabled() bool
	// CNBasedConsistencyEnabled return true if cn based consistency feature enabled
	CNBasedConsistencyEnabled() bool

	// IterTxns iter all txns
	IterTxns(func(TxnOverview) bool)

	// GetState returns the current state of txn client.
	GetState() TxnState
}

TxnClient transaction client, the operational entry point for transactions. Each CN node holds one instance of TxnClient.

func NewTxnClient

func NewTxnClient(
	sender rpc.TxnSender,
	options ...TxnClientCreateOption) TxnClient

NewTxnClient create a txn client with TxnSender and Options

type TxnClientCreateOption

type TxnClientCreateOption func(*txnClient)

TxnClientCreateOption options for create txn

func WithEnableCNBasedConsistency added in v0.8.0

func WithEnableCNBasedConsistency() TxnClientCreateOption

WithEnableCNBasedConsistency let all transactions on a CN see writes committed by other transactions before them. When this feature is enabled, the client maintains a CN-Based commit timestamp, and when opening a new transaction, it adjusts the transaction's snapshot timestamp to at least >= lastCommitTimestamp, so that it can see the writes of the previously committed transaction

func WithEnableLeakCheck added in v0.8.0

func WithEnableLeakCheck(
	maxActiveAges time.Duration,
	leakHandleFunc func(txnID []byte, createAt time.Time, createBy string)) TxnClientCreateOption

WithEnableLeakCheck enable txn leak check. Used to found any txn is not committed or rolled back.

func WithEnableRefreshExpression added in v0.8.0

func WithEnableRefreshExpression() TxnClientCreateOption

WithEnableRefreshExpression in RC mode, in the event of a conflict, the later transaction needs to see the latest data after the previous transaction commits. At this time we need to re-read the data, re-read the latest data, and re-compute the expression.

func WithEnableSacrificingFreshness added in v0.8.0

func WithEnableSacrificingFreshness() TxnClientCreateOption

When making this optimization, there are some scenarios where data consistency must be guaranteed, such as a database connection in a session where the latter transaction must be able to read the data committed by the previous transaction, then it is necessary to maintain a Session-level transaction last commit time, and the start time of the next transaction cannot be less than this value.

If we need to ensure that all the transactions on a CN can read the writes of the previous committed transaction, then we can use WithEnableCNBasedConsistency to turn on.

func WithLockService added in v0.8.0

func WithLockService(lockService lockservice.LockService) TxnClientCreateOption

WithLockService setup lock service

func WithMaxActiveTxn added in v1.0.0

func WithMaxActiveTxn(n int) TxnClientCreateOption

WithMaxActiveTxn is the count of max active txn in current cn. If reached max value, the txn is added to a FIFO queue. Default is unlimited.

func WithNormalStateNoWait added in v1.1.0

func WithNormalStateNoWait(t bool) TxnClientCreateOption

WithNormalStateNoWait sets the normalStateNoWait value of txnClient.

func WithTimestampWaiter added in v0.8.0

func WithTimestampWaiter(waiter TimestampWaiter) TxnClientCreateOption

WithTimestampWaiter setup timestamp waiter to get the latest applied committed timestamp from logtail.

func WithTxnIDGenerator

func WithTxnIDGenerator(generator TxnIDGenerator) TxnClientCreateOption

WithTxnIDGenerator setup txn id generator

func WithTxnLimit added in v1.0.0

func WithTxnLimit(n int) TxnClientCreateOption

WithTxnLimit flow control of transaction creation, maximum number of transactions per second

type TxnIDGenerator

type TxnIDGenerator interface {
	// Generate returns a unique transaction id
	Generate() []byte
}

TxnIDGenerator txn id generator

type TxnOperator

type TxnOperator interface {
	// GetOverview returns txn overview
	GetOverview() TxnOverview

	// Txn returns the current txn metadata
	Txn() txn.TxnMeta
	// TxnRef returns pointer of current txn metadata. In RC mode, txn's snapshot ts
	// will updated before statement executed.
	TxnRef() *txn.TxnMeta
	// Snapshot a snapshot of the transaction handle that can be passed around the
	// network. In some scenarios, operations of a transaction are executed on multiple
	// CN nodes for performance acceleration. But with only one CN coordinator, Snapshot
	// can be used to recover the transaction operation handle at a non-CN coordinator
	// node, or it can be used to pass information back to the transaction coordinator
	// after the non-CN coordinator completes the transaction operation.
	Snapshot() ([]byte, error)
	// UpdateSnapshot in some scenarios, we need to boost the snapshotTimestamp to eliminate
	// the w-w conflict.
	// If ts is empty, it will use the latest commit timestamp which is received from DN.
	UpdateSnapshot(ctx context.Context, ts timestamp.Timestamp) error
	// SnapshotTS returns the snapshot timestamp of the transaction.
	SnapshotTS() timestamp.Timestamp
	// Status returns the current transaction status.
	Status() txn.TxnStatus
	// ApplySnapshot CN coordinator applies a snapshot of the non-coordinator's transaction
	// operation information.
	ApplySnapshot(data []byte) error
	// Read transaction read operation, the operator routes the message based
	// on the given TN node information and waits for the read data synchronously.
	// The transaction has been aborted if ErrTxnAborted returned.
	// After use, SendResult needs to call the Release method
	Read(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error)
	// Write transaction write operation, and the operator will record the DN
	// nodes written by the current transaction, and when it finds that multiple
	// TN nodes are written, it will start distributed transaction processing.
	// The transaction has been aborted if ErrTxnAborted returned.
	// After use, SendResult needs to call the Release method
	Write(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error)
	// WriteAndCommit is similar to Write, but commit the transaction after write.
	// After use, SendResult needs to call the Release method
	WriteAndCommit(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error)
	// Commit the transaction. If data has been written to multiple TN nodes, a
	// 2pc distributed transaction commit process is used.
	Commit(ctx context.Context) error
	// Rollback the transaction.
	Rollback(ctx context.Context) error

	// AddLockTable for pessimistic transactions, if the current transaction is successfully
	// locked, the metadata corresponding to the lockservice needs to be recorded to the txn, and
	// at transaction commit time, the metadata of all lock services accessed by the transaction
	// will be committed to tn to check. If the metadata of the lockservice changes in [lock, commit],
	// the transaction will be rolled back.
	AddLockTable(locktable lock.LockTable) error
	// AddWaitLock add wait lock for current txn
	AddWaitLock(tableID uint64, rows [][]byte, opt lock.LockOptions) uint64
	// RemoveWaitLock remove wait lock for current txn
	RemoveWaitLock(key uint64)
	// LockSkipped return true if lock need skipped.
	LockSkipped(tableID uint64, mode lock.LockMode) bool

	// AddWorkspace for the transaction
	AddWorkspace(workspace Workspace)
	// GetWorkspace from the transaction
	GetWorkspace() Workspace

	ResetRetry(bool)
	IsRetry() bool

	SetOpenLog(bool)
	IsOpenLog() bool

	// AppendEventCallback append callback. All append callbacks will be called sequentially
	// if event happen.
	AppendEventCallback(event EventType, callbacks ...func(txn.TxnMeta))

	// Debug send debug request to DN, after use, SendResult needs to call the Release
	// method.
	Debug(ctx context.Context, ops []txn.TxnRequest) (*rpc.SendResult, error)
}

TxnOperator operator for transaction clients, handling read and write requests for transactions, and handling distributed transactions across DN nodes. Note: For Error returned by Read/Write/WriteAndCommit/Commit/Rollback, need to check if it is a moerr.ErrDNShardNotFound error, if so, the TN information held is out of date and needs to be reloaded by HAKeeper.

type TxnOption

type TxnOption func(*txnOperator)

TxnOption options for setup transaction FIXME(fagongzi): refactor TxnOption to avoid mem alloc

func WithSnapshotTS added in v0.7.0

func WithSnapshotTS(ts timestamp.Timestamp) TxnOption

WithSnapshotTS use a spec snapshot timestamp to build TxnOperator.

func WithTxnCNCoordinator

func WithTxnCNCoordinator() TxnOption

WithTxnCNCoordinator set cn txn coordinator

func WithTxnCacheWrite

func WithTxnCacheWrite() TxnOption

WithTxnCacheWrite Set cache write requests, after each Write call, the request will not be sent to the TN node immediately, but stored in the Coordinator's memory, and the Coordinator will choose the right time to send the cached requests. The following scenarios trigger the sending of requests to DN:

  1. Before read, because the Coordinator is not aware of the format and content of the written data, it is necessary to send the cached write requests to the corresponding TN node each time Read is called, used to implement "read your write".
  2. Before commit, obviously, the cached write requests needs to be sent to the corresponding TN node before commit.

func WithTxnCreateBy added in v0.8.0

func WithTxnCreateBy(createBy string) TxnOption

WithTxnCreateBy set txn create by.Used to check leak txn

func WithTxnDisable1PCOpt

func WithTxnDisable1PCOpt() TxnOption

WithTxnDisable1PCOpt disable 1pc opt on distributed transaction. By default, mo enables 1pc optimization for distributed transactions. For write operations, if all partitions' prepares are executed successfully, then the transaction is considered committed and returned directly to the client. Partitions' prepared data are committed asynchronously.

func WithTxnIsolation added in v0.8.0

func WithTxnIsolation(value txn.TxnIsolation) TxnOption

WithTxnIsolation set txn isolation

func WithTxnLockService added in v0.8.0

func WithTxnLockService(lockService lockservice.LockService) TxnOption

WithTxnLockService set txn lock service

func WithTxnMode added in v0.8.0

func WithTxnMode(value txn.TxnMode) TxnOption

WithTxnMode set txn mode

func WithTxnOpenLog added in v1.1.0

func WithTxnOpenLog() TxnOption

WithTxnOpenLog set txn open log

func WithTxnReadyOnly

func WithTxnReadyOnly() TxnOption

WithTxnReadyOnly setup readonly flag

func WithTxnSkipLock added in v1.1.0

func WithTxnSkipLock(
	tables []uint64,
	modes []lock.LockMode) TxnOption

WithTxnSkipLock skip txn lock on specified tables

func WithUserTxn added in v1.0.0

func WithUserTxn() TxnOption

WithUserTxn setup user transaction flag. Only user transactions need to be controlled for the maximum number of active transactions.

type TxnOverview added in v1.0.0

type TxnOverview struct {
	// CreateAt create at
	CreateAt time.Time
	// Meta txn metadata
	Meta txn.TxnMeta
	// UserTxn true if is a user transaction
	UserTxn bool
	// WaitLocks wait locks
	WaitLocks []Lock
}

TxnOverview txn overview include meta and status

type TxnState added in v1.1.0

type TxnState struct {
	State int
	// user active txns
	Users int
	// all active txns
	ActiveTxns []string
	// FIFO queue for ready to active txn
	WaitActiveTxns []string
	// LatestTS is the latest timestamp of the txn client.
	LatestTS timestamp.Timestamp
}

type Workspace added in v0.8.0

type Workspace interface {
	// StartStatement tag a statement is running
	StartStatement()
	// EndStatement tag end a statement is completed
	EndStatement()

	// IncrStatementID incr the execute statement id. It maintains the statement id, first statement is 1,
	// second is 2, and so on. If in rc mode, snapshot will updated to latest applied commit ts from dn. And
	// workspace will update snapshot data for later read request.
	IncrStatementID(ctx context.Context, commit bool) error
	// RollbackLastStatement rollback the last statement.
	RollbackLastStatement(ctx context.Context) error

	// Adjust adjust workspace, adjust update's delete+insert to correct order and merge workspace.
	Adjust() error

	Commit(ctx context.Context) ([]txn.TxnRequest, error)
	Rollback(ctx context.Context) error

	IncrSQLCount()
	GetSQLCount() uint64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL