Documentation ¶
Index ¶
- func WithTxnSession(ctx context.Context, session *TxnSession) context.Context
- type TxnManager
- func (m *TxnManager) BeginNewTxn(ctx context.Context, timetick uint64, keepalive time.Duration) (*TxnSession, error)
- func (m *TxnManager) CleanupTxnUntil(ts uint64)
- func (m *TxnManager) GetSessionOfTxn(id message.TxnID) (*TxnSession, error)
- func (m *TxnManager) GracefulClose(ctx context.Context) error
- type TxnSession
- func (s *TxnSession) AddNewMessage(ctx context.Context, timetick uint64) error
- func (s *TxnSession) AddNewMessageDoneAndKeepalive(timetick uint64)
- func (s *TxnSession) AddNewMessageFail()
- func (s *TxnSession) BeginDone()
- func (s *TxnSession) BeginRollback()
- func (s *TxnSession) Cleanup()
- func (s *TxnSession) CommitDone()
- func (s *TxnSession) IsExpiredOrDone(ts uint64) bool
- func (s *TxnSession) RegisterCleanup(f func(), ts uint64)
- func (s *TxnSession) RequestCommitAndWait(ctx context.Context, timetick uint64) error
- func (s *TxnSession) RequestRollback(ctx context.Context, timetick uint64) error
- func (s *TxnSession) RollbackDone()
- func (s *TxnSession) State() message.TxnState
- func (s *TxnSession) TxnContext() message.TxnContext
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithTxnSession ¶
func WithTxnSession(ctx context.Context, session *TxnSession) context.Context
WithTxnSession returns a new context with the TxnSession.
Types ¶
type TxnManager ¶
type TxnManager struct {
// contains filtered or unexported fields
}
TxnManager is the manager of transactions. We don't support cross wal transaction by now and We don't support the transaction lives after the wal transferred to another streaming node.
func NewTxnManager ¶
func NewTxnManager(pchannel types.PChannelInfo) *TxnManager
NewTxnManager creates a new transaction manager.
func (*TxnManager) BeginNewTxn ¶
func (m *TxnManager) BeginNewTxn(ctx context.Context, timetick uint64, keepalive time.Duration) (*TxnSession, error)
BeginNewTxn starts a new transaction with a session. We only support a transaction work on a streaming node, once the wal is transferred to another node, the transaction is treated as expired (rollback), and user will got a expired error, then perform a retry.
func (*TxnManager) CleanupTxnUntil ¶
func (m *TxnManager) CleanupTxnUntil(ts uint64)
CleanupTxnUntil cleans up the transactions until the specified timestamp.
func (*TxnManager) GetSessionOfTxn ¶
func (m *TxnManager) GetSessionOfTxn(id message.TxnID) (*TxnSession, error)
GetSessionOfTxn returns the session of the transaction.
func (*TxnManager) GracefulClose ¶
func (m *TxnManager) GracefulClose(ctx context.Context) error
GracefulClose waits for all transactions to be cleaned up.
type TxnSession ¶
type TxnSession struct {
// contains filtered or unexported fields
}
TxnSession is a session for a transaction.
func GetTxnSessionFromContext ¶
func GetTxnSessionFromContext(ctx context.Context) *TxnSession
GetTxnSessionFromContext returns the TxnSession from the context.
func (*TxnSession) AddNewMessage ¶
func (s *TxnSession) AddNewMessage(ctx context.Context, timetick uint64) error
AddNewMessage adds a new message to the session.
func (*TxnSession) AddNewMessageDoneAndKeepalive ¶
func (s *TxnSession) AddNewMessageDoneAndKeepalive(timetick uint64)
AddNewMessageDoneAndKeepalive decreases the in flight count of the session and keepalive the session. notify the committedWait channel if the in flight count is 0 and committed waited.
func (*TxnSession) AddNewMessageFail ¶
func (s *TxnSession) AddNewMessageFail()
AddNewMessageFail decreases the in flight count of the session but not refresh the lease.
func (*TxnSession) BeginDone ¶
func (s *TxnSession) BeginDone()
BeginDone marks the transaction as in flight.
func (*TxnSession) BeginRollback ¶
func (s *TxnSession) BeginRollback()
BeginRollback marks the transaction as rollbacked at begin state.
func (*TxnSession) CommitDone ¶
func (s *TxnSession) CommitDone()
CommitDone marks the transaction as committed.
func (*TxnSession) IsExpiredOrDone ¶
func (s *TxnSession) IsExpiredOrDone(ts uint64) bool
isExpiredOrDone checks if the session is expired or done.
func (*TxnSession) RegisterCleanup ¶
func (s *TxnSession) RegisterCleanup(f func(), ts uint64)
RegisterCleanup registers the cleanup function for the session. It will be called when the session is expired or done. !!! A committed/rollbacked or expired session will never be seen by other components. so the cleanup function will always be called.
func (*TxnSession) RequestCommitAndWait ¶
func (s *TxnSession) RequestCommitAndWait(ctx context.Context, timetick uint64) error
RequestCommitAndWait request commits the transaction and waits for the all messages sent.
func (*TxnSession) RequestRollback ¶
func (s *TxnSession) RequestRollback(ctx context.Context, timetick uint64) error
RequestRollback rolls back the transaction.
func (*TxnSession) RollbackDone ¶
func (s *TxnSession) RollbackDone()
RollbackDone marks the transaction as rollbacked.
func (*TxnSession) State ¶
func (s *TxnSession) State() message.TxnState
State returns the state of the session.
func (*TxnSession) TxnContext ¶
func (s *TxnSession) TxnContext() message.TxnContext
TxnContext returns the txn context of the session.