txnbase

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CmdPointer int16 = iota
	CmdDeleteBitmap
	CmdBatch
	CmdAppend
	CmdDelete
	CmdComposed
	CmdTxn
	CmdTxnState
	CmdCustomized
)
View Source
const (
	TSUncommitted int32 = iota
	TSCommitting
	TSCommitted
	TSRollbacking
	TSRollbacked
)
View Source
const (
	OpCommit = iota
	OpRollback
	OpPrepare
	OpCommitting
	OpInvalid
)
View Source
const (
	EventRollback = iota + 1
	EventCommitting
	EventCommit
)
View Source
const (
	IDSize = 8 + 8 + 8 + 4 + 2 + 1
)
View Source
const (
	MaxNodeRows uint32 = 10000
)

Variables

View Source
var (
	SnapshotAttr_StartTS       = "start_ts"
	SnapshotAttr_PrepareTS     = "prepare_ts"
	SnapshotAttr_CommitTS      = "commit_ts"
	SnapshotAttr_LogIndex_LSN  = "log_index_lsn"
	SnapshotAttr_LogIndex_CSN  = "log_index_csn"
	SnapshotAttr_LogIndex_Size = "log_index_size"
)
View Source
var DefaultTxnFactory = func(
	mgr *TxnManager,
	store txnif.TxnStore,
	id []byte,
	startTS types.TS,
	info []byte) txnif.AsyncTxn {
	return NewTxn(mgr, store, id, startTS, info)
}
View Source
var (
	ErrTransferTransactionState = moerr.NewInternalErrorNoCtx("tae: transfer transaction state error")
)
View Source
var NoopStoreFactory = func() txnif.TxnStore { return new(NoopTxnStore) }

Functions

func BuildCommandFrom

func BuildCommandFrom(r io.Reader) (cmd txnif.TxnCmd, n int64, err error)

func CompareTxnMVCCNode added in v0.6.0

func CompareTxnMVCCNode(e, o *TxnMVCCNode) int

func IDCtxToID

func IDCtxToID(buf []byte) string

func IDToIDCtx

func IDToIDCtx(id uint64) []byte

func IsCustomizedCmd

func IsCustomizedCmd(cmd txnif.TxnCmd) bool

func MarshalID

func MarshalID(id *common.ID) []byte

func TxnField

func TxnField(txn txnif.AsyncTxn) zap.Field

func TxnMgrField

func TxnMgrField(mgr *TxnManager) zap.Field

func UnmarshalID

func UnmarshalID(buf []byte) *common.ID

Types

type BaseCmd

type BaseCmd struct{}

func (*BaseCmd) Close

func (base *BaseCmd) Close()

type BaseCustomizedCmd

type BaseCustomizedCmd struct {
	BaseCmd
	ID   uint32
	Impl txnif.TxnCmd
}

func NewBaseCustomizedCmd

func NewBaseCustomizedCmd(id uint32, impl txnif.TxnCmd) *BaseCustomizedCmd

func (*BaseCustomizedCmd) GetID

func (c *BaseCustomizedCmd) GetID() uint32

type BatchCmd

type BatchCmd struct {
	BaseCmd
	Bat *containers.Batch
}

func NewBatchCmd

func NewBatchCmd(bat *containers.Batch) *BatchCmd

func (*BatchCmd) ApplyCommit added in v0.6.0

func (e *BatchCmd) ApplyCommit()

func (*BatchCmd) ApplyRollback added in v0.6.0

func (e *BatchCmd) ApplyRollback()

func (*BatchCmd) Close

func (e *BatchCmd) Close()

func (*BatchCmd) Desc

func (e *BatchCmd) Desc() string

func (*BatchCmd) GetType

func (e *BatchCmd) GetType() int16

func (*BatchCmd) Marshal

func (e *BatchCmd) Marshal() (buf []byte, err error)

func (*BatchCmd) ReadFrom

func (e *BatchCmd) ReadFrom(r io.Reader) (n int64, err error)

func (*BatchCmd) SetReplayTxn added in v0.6.0

func (e *BatchCmd) SetReplayTxn(_ txnif.AsyncTxn)

func (*BatchCmd) String

func (e *BatchCmd) String() string

func (*BatchCmd) Unmarshal

func (e *BatchCmd) Unmarshal(buf []byte) error

func (*BatchCmd) VerboseString

func (e *BatchCmd) VerboseString() string

func (*BatchCmd) WriteTo

func (e *BatchCmd) WriteTo(w io.Writer) (n int64, err error)

type ComposedCmd

type ComposedCmd struct {
	BaseCmd
	Cmds    []txnif.TxnCmd
	CmdSize uint32
}

func NewComposedCmd

func NewComposedCmd() *ComposedCmd

func (*ComposedCmd) AddCmd

func (cc *ComposedCmd) AddCmd(cmd txnif.TxnCmd)

func (*ComposedCmd) ApplyCommit added in v0.6.0

func (cc *ComposedCmd) ApplyCommit()

func (*ComposedCmd) ApplyRollback added in v0.6.0

func (cc *ComposedCmd) ApplyRollback()

func (*ComposedCmd) Close

func (cc *ComposedCmd) Close()

func (*ComposedCmd) Desc

func (cc *ComposedCmd) Desc() string

func (*ComposedCmd) GetType

func (cc *ComposedCmd) GetType() int16

func (*ComposedCmd) Marshal

func (cc *ComposedCmd) Marshal() (buf []byte, err error)

func (*ComposedCmd) ReadFrom

func (cc *ComposedCmd) ReadFrom(r io.Reader) (n int64, err error)

func (*ComposedCmd) SetCmdSize

func (cc *ComposedCmd) SetCmdSize(size uint32)

func (*ComposedCmd) SetReplayTxn added in v0.6.0

func (cc *ComposedCmd) SetReplayTxn(txn txnif.AsyncTxn)

func (*ComposedCmd) String

func (cc *ComposedCmd) String() string

func (*ComposedCmd) ToDesc

func (cc *ComposedCmd) ToDesc(prefix string) string

func (*ComposedCmd) ToString

func (cc *ComposedCmd) ToString(prefix string) string

func (*ComposedCmd) ToVerboseString

func (cc *ComposedCmd) ToVerboseString(prefix string) string

func (*ComposedCmd) Unmarshal

func (cc *ComposedCmd) Unmarshal(buf []byte) (err error)

func (*ComposedCmd) VerboseString

func (cc *ComposedCmd) VerboseString() string

func (*ComposedCmd) WriteTo

func (cc *ComposedCmd) WriteTo(w io.Writer) (n int64, err error)

type CustomizedCmd

type CustomizedCmd interface {
	GetID() uint32
}

type DeleteBitmapCmd

type DeleteBitmapCmd struct {
	BaseCmd
	Bitmap *roaring.Bitmap
}

func NewDeleteBitmapCmd

func NewDeleteBitmapCmd(bitmap *roaring.Bitmap) *DeleteBitmapCmd

func (*DeleteBitmapCmd) ApplyCommit added in v0.6.0

func (e *DeleteBitmapCmd) ApplyCommit()

func (*DeleteBitmapCmd) ApplyRollback added in v0.6.0

func (e *DeleteBitmapCmd) ApplyRollback()

func (*DeleteBitmapCmd) Desc

func (e *DeleteBitmapCmd) Desc() string

func (*DeleteBitmapCmd) GetType

func (e *DeleteBitmapCmd) GetType() int16

func (*DeleteBitmapCmd) Marshal

func (e *DeleteBitmapCmd) Marshal() (buf []byte, err error)

func (*DeleteBitmapCmd) ReadFrom

func (e *DeleteBitmapCmd) ReadFrom(r io.Reader) (n int64, err error)

func (*DeleteBitmapCmd) SetReplayTxn added in v0.6.0

func (e *DeleteBitmapCmd) SetReplayTxn(_ txnif.AsyncTxn)

func (*DeleteBitmapCmd) String

func (e *DeleteBitmapCmd) String() string

func (*DeleteBitmapCmd) Unmarshal

func (e *DeleteBitmapCmd) Unmarshal(buf []byte) error

func (*DeleteBitmapCmd) VerboseString

func (e *DeleteBitmapCmd) VerboseString() string

func (*DeleteBitmapCmd) WriteTo

func (e *DeleteBitmapCmd) WriteTo(w io.Writer) (n int64, err error)

type MVCCChain added in v0.6.0

type MVCCChain struct {
	*sync.RWMutex
	MVCC *common.GenericSortedDList[txnif.MVCCNode]
	// contains filtered or unexported fields
}

func NewMVCCChain added in v0.6.0

func NewMVCCChain(comparefn func(txnif.MVCCNode, txnif.MVCCNode) int, newnodefn func() txnif.MVCCNode) *MVCCChain

func (*MVCCChain) Apply1PCCommit added in v0.6.0

func (be *MVCCChain) Apply1PCCommit(index *wal.Index) error

func (*MVCCChain) ApplyCommit added in v0.6.0

func (be *MVCCChain) ApplyCommit(index *wal.Index) error

func (*MVCCChain) ApplyRollback added in v0.6.0

func (be *MVCCChain) ApplyRollback(index *wal.Index) error

func (*MVCCChain) CheckConflict added in v0.6.0

func (be *MVCCChain) CheckConflict(txn txnif.TxnReader) (err error)

func (*MVCCChain) CloneCommittedInRange added in v0.6.0

func (be *MVCCChain) CloneCommittedInRange(start, end types.TS) (ret *MVCCChain)

func (*MVCCChain) CloneLatestNode added in v0.6.0

func (be *MVCCChain) CloneLatestNode() (*MVCCChain, txnif.MVCCNode)

func (*MVCCChain) ClonePreparedInRange added in v0.6.0

func (be *MVCCChain) ClonePreparedInRange(start, end types.TS) (ret []txnif.MVCCNode)

func (*MVCCChain) Depth added in v0.6.0

func (be *MVCCChain) Depth() int

func (*MVCCChain) GetIndexes added in v0.6.0

func (be *MVCCChain) GetIndexes() []*wal.Index

func (*MVCCChain) GetLatestCommittedNode added in v0.6.0

func (be *MVCCChain) GetLatestCommittedNode() (node txnif.MVCCNode)

GetLatestCommittedNode gets the latest committed mvcc node. It's useful when check whether the catalog/metadata entry is deleted.

func (*MVCCChain) GetLatestNodeLocked added in v0.6.0

func (be *MVCCChain) GetLatestNodeLocked() txnif.MVCCNode

GetLatestNodeLocked gets the latest mvcc node. It is useful in making command, apply state(e.g. ApplyCommit), check confilct.

func (*MVCCChain) GetLogIndex added in v0.6.0

func (be *MVCCChain) GetLogIndex() *wal.Index

func (*MVCCChain) GetPrepareTs added in v0.6.0

func (be *MVCCChain) GetPrepareTs() types.TS

for replay

func (*MVCCChain) GetTxn added in v0.6.0

func (be *MVCCChain) GetTxn() txnif.TxnReader

func (*MVCCChain) GetVisibleNode added in v0.6.0

func (be *MVCCChain) GetVisibleNode(ts types.TS) (node txnif.MVCCNode)

GetVisibleNode gets mvcc node according to the timestamp. It returns the mvcc node in the same txn as the read txn or returns the latest mvcc node with commitTS less than the timestamp.

func (*MVCCChain) HasCommittedNode added in v0.6.0

func (be *MVCCChain) HasCommittedNode() bool

func (*MVCCChain) HasCommittedNodeInRange added in v0.6.0

func (be *MVCCChain) HasCommittedNodeInRange(start, end types.TS) (ok bool)

[start, end] Check whether there is any committed node in between [start, end] -----+------+-------+--------+----------+--------->

    |      |       |        |          |      Time
    |     start    |       end         |
commitTs <----- commitTs <--------- commitTs|uncommitted  <=  MVCCChain Header
   (1)            (2)                 (3)

func (*MVCCChain) HasUncommittedNode added in v0.6.0

func (be *MVCCChain) HasUncommittedNode() bool

func (*MVCCChain) Insert added in v0.6.0

func (be *MVCCChain) Insert(vun txnif.MVCCNode) (node *common.GenericDLNode[txnif.MVCCNode])

func (*MVCCChain) IsCommitted added in v0.6.0

func (be *MVCCChain) IsCommitted() bool

func (*MVCCChain) IsCommitting added in v0.6.0

func (be *MVCCChain) IsCommitting() bool

In /Catalog, there're three states: Active, Committing and Committed. A txn is Active before its CommitTs is allocated. It's Committed when its state will never change, i.e. TxnStateCommitted and TxnStateRollbacked. It's Committing when it's in any other state, including TxnStateCommitting, TxnStateRollbacking, TxnStatePrepared and so on. When read or write an entry, if the last txn of the entry is Committing, we wait for it. When write on an Entry, if there's an Active txn, we report w-w conflict.

func (*MVCCChain) IsCreatingOrAborted added in v0.6.0

func (be *MVCCChain) IsCreatingOrAborted() bool

func (*MVCCChain) IsEmpty added in v0.6.0

func (be *MVCCChain) IsEmpty() bool

func (*MVCCChain) LoopChain added in v0.6.0

func (be *MVCCChain) LoopChain(fn func(txnif.MVCCNode) bool)

func (*MVCCChain) MustOneNodeLocked added in v0.6.0

func (be *MVCCChain) MustOneNodeLocked() (txnif.MVCCNode, bool)

func (*MVCCChain) NeedWaitCommitting added in v0.6.0

func (be *MVCCChain) NeedWaitCommitting(ts types.TS) (bool, txnif.TxnReader)

func (*MVCCChain) PrepareCommit added in v0.6.0

func (be *MVCCChain) PrepareCommit() error

func (*MVCCChain) PrepareRollback added in v0.6.0

func (be *MVCCChain) PrepareRollback() (bool, error)

func (*MVCCChain) ReadAllFrom added in v0.6.0

func (be *MVCCChain) ReadAllFrom(r io.Reader) (n int64, err error)

func (*MVCCChain) ReadOneNodeFrom added in v0.6.0

func (be *MVCCChain) ReadOneNodeFrom(r io.Reader) (n int64, err error)

func (*MVCCChain) SearchNode added in v0.6.0

func (be *MVCCChain) SearchNode(o txnif.MVCCNode) (node txnif.MVCCNode)

It's only used in replay

func (*MVCCChain) StringLocked added in v0.6.0

func (be *MVCCChain) StringLocked() string

func (*MVCCChain) WriteAllTo added in v0.6.0

func (be *MVCCChain) WriteAllTo(w io.Writer) (n int64, err error)

func (*MVCCChain) WriteOneNodeTo added in v0.6.0

func (be *MVCCChain) WriteOneNodeTo(w io.Writer) (n int64, err error)

type MVCCSlice added in v0.6.0

type MVCCSlice struct {
	MVCC []txnif.MVCCNode
	// contains filtered or unexported fields
}

func NewMVCCSlice added in v0.6.0

func NewMVCCSlice(newnodefn func() txnif.MVCCNode,
	comparefn func(txnif.MVCCNode, txnif.MVCCNode) int) *MVCCSlice

func (*MVCCSlice) Close added in v0.7.0

func (be *MVCCSlice) Close()

func (*MVCCSlice) DeleteNode added in v0.6.0

func (be *MVCCSlice) DeleteNode(node txnif.MVCCNode)

func (*MVCCSlice) ForEach added in v0.6.0

func (be *MVCCSlice) ForEach(fn func(un txnif.MVCCNode) bool, reverse bool)

func (*MVCCSlice) GetCommittedNode added in v0.6.0

func (be *MVCCSlice) GetCommittedNode() (node txnif.MVCCNode)

GetCommittedNode gets the latest committed UpdateNode. It's useful when check whether the catalog/metadata entry is deleted.

func (*MVCCSlice) GetLastNonAbortedNode added in v0.6.0

func (be *MVCCSlice) GetLastNonAbortedNode() (node txnif.MVCCNode)

func (*MVCCSlice) GetNodeByOffset added in v0.6.0

func (be *MVCCSlice) GetNodeByOffset(offset int) txnif.MVCCNode

func (*MVCCSlice) GetNodeToReadByPrepareTS added in v0.6.0

func (be *MVCCSlice) GetNodeToReadByPrepareTS(ts types.TS) (offset int, node txnif.MVCCNode)

GetNodeToRead gets UpdateNode according to the timestamp. It returns the UpdateNode in the same txn as the read txn or returns the latest UpdateNode with commitTS less than the timestamp. todo getend or getcommitts

func (*MVCCSlice) GetTs added in v0.6.0

func (be *MVCCSlice) GetTs() types.TS

for replay

func (*MVCCSlice) GetUpdateNodeLocked added in v0.6.0

func (be *MVCCSlice) GetUpdateNodeLocked() txnif.MVCCNode

GetUpdateNode gets the latest UpdateNode. It is useful in making command, apply state(e.g. ApplyCommit), check confilct.

func (*MVCCSlice) GetVisibleNode added in v0.6.0

func (be *MVCCSlice) GetVisibleNode(ts types.TS) (node txnif.MVCCNode)

func (*MVCCSlice) InsertNode added in v0.6.0

func (be *MVCCSlice) InsertNode(un txnif.MVCCNode)

func (*MVCCSlice) IsCommitted added in v0.6.0

func (be *MVCCSlice) IsCommitted() bool

func (*MVCCSlice) IsCommitting added in v0.6.0

func (be *MVCCSlice) IsCommitting() bool

func (*MVCCSlice) IsEmpty added in v0.6.0

func (be *MVCCSlice) IsEmpty() bool

func (*MVCCSlice) LoopInRange added in v0.6.0

func (be *MVCCSlice) LoopInRange(start, end types.TS, fn func(txnif.MVCCNode) bool) (indexes []*wal.Index)

func (*MVCCSlice) LoopOffsetRange added in v0.6.0

func (be *MVCCSlice) LoopOffsetRange(start, end int, fn func(txnif.MVCCNode) bool)

func (*MVCCSlice) SearchNode added in v0.6.0

func (be *MVCCSlice) SearchNode(o txnif.MVCCNode) (node txnif.MVCCNode)

func (*MVCCSlice) SearchNodeByCompareFn added in v0.6.0

func (be *MVCCSlice) SearchNodeByCompareFn(fn func(a txnif.MVCCNode) int) (offset int, node txnif.MVCCNode)

func (*MVCCSlice) SearchNodeByTS added in v0.6.0

func (be *MVCCSlice) SearchNodeByTS(ts types.TS) (node txnif.MVCCNode)

func (*MVCCSlice) StringLocked added in v0.6.0

func (be *MVCCSlice) StringLocked() string

type Node

type Node interface {
	base.INode
	Type() NodeType
	ToTransient()
	Close() error
}

type NodeState

type NodeState = int32
const (
	TransientNode NodeState = iota
	PersistNode
)

type NodeType

type NodeType int8

type NoopCommitListener added in v0.6.0

type NoopCommitListener struct{}

func (*NoopCommitListener) OnBeginPrePrepare added in v0.6.0

func (bl *NoopCommitListener) OnBeginPrePrepare(txn txnif.AsyncTxn)

func (*NoopCommitListener) OnEndPrePrepare added in v0.6.0

func (bl *NoopCommitListener) OnEndPrePrepare(txn txnif.AsyncTxn)

type NoopTxnStore

type NoopTxnStore struct{}

func (*NoopTxnStore) AddBlksWithMetaLoc added in v0.7.0

func (store *NoopTxnStore) AddBlksWithMetaLoc(
	dbId, tid uint64,
	pks []containers.Vector,
	file string,
	metaLocs []string,
	flag int32) error

func (*NoopTxnStore) AddTxnEntry

func (store *NoopTxnStore) AddTxnEntry(t txnif.TxnEntryType, entry txnif.TxnEntry)

func (*NoopTxnStore) Append

func (store *NoopTxnStore) Append(dbId, id uint64, data *containers.Batch) error

func (*NoopTxnStore) Apply2PCPrepare added in v0.6.0

func (store *NoopTxnStore) Apply2PCPrepare() error

func (*NoopTxnStore) ApplyCommit

func (store *NoopTxnStore) ApplyCommit() error

func (*NoopTxnStore) ApplyRollback

func (store *NoopTxnStore) ApplyRollback() error

func (*NoopTxnStore) BatchDedup

func (store *NoopTxnStore) BatchDedup(uint64, uint64, containers.Vector) (err error)

func (*NoopTxnStore) BindTxn

func (store *NoopTxnStore) BindTxn(txn txnif.AsyncTxn)

func (*NoopTxnStore) Close

func (store *NoopTxnStore) Close() error

func (*NoopTxnStore) CreateBlock

func (store *NoopTxnStore) CreateBlock(uint64, uint64, uint64, bool) (blk handle.Block, err error)

func (*NoopTxnStore) CreateDatabase

func (store *NoopTxnStore) CreateDatabase(name, creatSql string) (db handle.Database, err error)

func (*NoopTxnStore) CreateDatabaseWithID added in v0.6.0

func (store *NoopTxnStore) CreateDatabaseWithID(name, createSql string, id uint64) (db handle.Database, err error)

func (*NoopTxnStore) CreateNonAppendableBlock

func (store *NoopTxnStore) CreateNonAppendableBlock(dbId uint64, id *common.ID) (blk handle.Block, err error)

func (*NoopTxnStore) CreateNonAppendableBlockWithMeta added in v0.7.0

func (store *NoopTxnStore) CreateNonAppendableBlockWithMeta(
	_ uint64,
	_ *common.ID,
	_ string,
	_ string) (blk handle.Block, err error)

func (*NoopTxnStore) CreateNonAppendableSegment

func (store *NoopTxnStore) CreateNonAppendableSegment(dbId, tid uint64, _ bool) (seg handle.Segment, err error)

func (*NoopTxnStore) CreateRelation

func (store *NoopTxnStore) CreateRelation(dbId uint64, def any) (rel handle.Relation, err error)

func (*NoopTxnStore) CreateRelationWithTableId added in v0.6.0

func (store *NoopTxnStore) CreateRelationWithTableId(dbId uint64, tableId uint64, def any) (rel handle.Relation, err error)

func (*NoopTxnStore) CreateSegment

func (store *NoopTxnStore) CreateSegment(dbId, tid uint64, is1PC bool) (seg handle.Segment, err error)

func (*NoopTxnStore) DatabaseNames

func (store *NoopTxnStore) DatabaseNames() (names []string)

func (*NoopTxnStore) DropDatabase

func (store *NoopTxnStore) DropDatabase(name string) (db handle.Database, err error)

func (*NoopTxnStore) DropDatabaseByID added in v0.6.0

func (store *NoopTxnStore) DropDatabaseByID(id uint64) (db handle.Database, err error)

func (*NoopTxnStore) DropRelationByID added in v0.6.0

func (store *NoopTxnStore) DropRelationByID(dbId uint64, id uint64) (rel handle.Relation, err error)

func (*NoopTxnStore) DropRelationByName

func (store *NoopTxnStore) DropRelationByName(dbId uint64, name string) (rel handle.Relation, err error)

func (*NoopTxnStore) GetBlock

func (store *NoopTxnStore) GetBlock(dbId uint64, id *common.ID) (blk handle.Block, err error)

func (*NoopTxnStore) GetByFilter

func (store *NoopTxnStore) GetByFilter(uint64, uint64, *handle.Filter) (id *common.ID, offset uint32, err error)

func (*NoopTxnStore) GetDatabase

func (store *NoopTxnStore) GetDatabase(name string) (db handle.Database, err error)

func (*NoopTxnStore) GetDatabaseByID added in v0.6.0

func (store *NoopTxnStore) GetDatabaseByID(id uint64) (db handle.Database, err error)

func (*NoopTxnStore) GetDirty added in v0.6.0

func (store *NoopTxnStore) GetDirty() *common.Tree

func (*NoopTxnStore) GetDirtyTableByID added in v0.6.0

func (store *NoopTxnStore) GetDirtyTableByID(id uint64) *common.TableTree

func (*NoopTxnStore) GetLSN

func (store *NoopTxnStore) GetLSN() uint64

func (*NoopTxnStore) GetRelationByID added in v0.6.0

func (store *NoopTxnStore) GetRelationByID(dbId uint64, id uint64) (rel handle.Relation, err error)

func (*NoopTxnStore) GetRelationByName

func (store *NoopTxnStore) GetRelationByName(dbId uint64, name string) (rel handle.Relation, err error)

func (*NoopTxnStore) GetSegment

func (store *NoopTxnStore) GetSegment(dbId uint64, id *common.ID) (seg handle.Segment, err error)

func (*NoopTxnStore) GetValue

func (store *NoopTxnStore) GetValue(uint64, *common.ID, uint32, uint16) (v any, err error)

func (*NoopTxnStore) HasAnyTableDataChanges added in v0.6.0

func (store *NoopTxnStore) HasAnyTableDataChanges() bool

func (*NoopTxnStore) HasCatalogChanges added in v0.6.0

func (store *NoopTxnStore) HasCatalogChanges() bool

func (*NoopTxnStore) HasTableDataChanges added in v0.6.0

func (store *NoopTxnStore) HasTableDataChanges(id uint64) bool

func (*NoopTxnStore) IncreateWriteCnt

func (store *NoopTxnStore) IncreateWriteCnt() int

func (*NoopTxnStore) IsReadonly

func (store *NoopTxnStore) IsReadonly() bool

func (*NoopTxnStore) LogBlockID

func (store *NoopTxnStore) LogBlockID(dbId, tid, bid uint64)

func (*NoopTxnStore) LogSegmentID

func (store *NoopTxnStore) LogSegmentID(dbId, tid, sid uint64)

func (*NoopTxnStore) LogTxnEntry

func (store *NoopTxnStore) LogTxnEntry(dbId, tableId uint64, entry txnif.TxnEntry, readed []*common.ID) (err error)

func (*NoopTxnStore) LogTxnState added in v0.6.0

func (store *NoopTxnStore) LogTxnState(sync bool) (logEntry entry.Entry, err error)

func (*NoopTxnStore) PreApplyCommit

func (store *NoopTxnStore) PreApplyCommit() error

func (*NoopTxnStore) PrePrepare added in v0.6.0

func (store *NoopTxnStore) PrePrepare() error

func (*NoopTxnStore) PrepareCommit

func (store *NoopTxnStore) PrepareCommit() error

func (*NoopTxnStore) PrepareRollback

func (store *NoopTxnStore) PrepareRollback() error

func (*NoopTxnStore) RangeDelete

func (store *NoopTxnStore) RangeDelete(uint64, *common.ID, uint32, uint32, handle.DeleteType) (err error)

func (*NoopTxnStore) SoftDeleteBlock

func (store *NoopTxnStore) SoftDeleteBlock(dbId uint64, id *common.ID) (err error)

func (*NoopTxnStore) SoftDeleteSegment

func (store *NoopTxnStore) SoftDeleteSegment(dbId uint64, id *common.ID) (err error)

func (*NoopTxnStore) UnsafeGetDatabase added in v0.6.0

func (store *NoopTxnStore) UnsafeGetDatabase(id uint64) (db handle.Database, err error)

func (*NoopTxnStore) UnsafeGetRelation added in v0.6.0

func (store *NoopTxnStore) UnsafeGetRelation(dbId, id uint64) (rel handle.Relation, err error)

func (*NoopTxnStore) Update

func (store *NoopTxnStore) Update(uint64, *common.ID, uint32, uint16, any) (err error)

func (*NoopTxnStore) UpdateDeltaLoc added in v0.6.0

func (store *NoopTxnStore) UpdateDeltaLoc(dbId uint64, id *common.ID, un string) (err error)

func (*NoopTxnStore) UpdateMetaLoc added in v0.6.0

func (store *NoopTxnStore) UpdateMetaLoc(dbId uint64, id *common.ID, un string) (err error)

func (*NoopTxnStore) WaitPrepared added in v0.6.0

func (store *NoopTxnStore) WaitPrepared() (err error)

type OpTxn

type OpTxn struct {
	Txn txnif.AsyncTxn
	Op  OpType
}

func (*OpTxn) Is2PC added in v0.6.0

func (txn *OpTxn) Is2PC() bool

func (*OpTxn) IsReplay added in v0.6.0

func (txn *OpTxn) IsReplay() bool

func (*OpTxn) IsTryCommitting added in v0.6.0

func (txn *OpTxn) IsTryCommitting() bool

func (*OpTxn) Repr

func (txn *OpTxn) Repr() string

type OpType

type OpType int8

type PointerCmd

type PointerCmd struct {
	BaseCmd
	Group uint32
	Lsn   uint64
}

func (*PointerCmd) ApplyCommit added in v0.6.0

func (e *PointerCmd) ApplyCommit()

func (*PointerCmd) ApplyRollback added in v0.6.0

func (e *PointerCmd) ApplyRollback()

func (*PointerCmd) Desc

func (e *PointerCmd) Desc() string

func (*PointerCmd) GetType

func (e *PointerCmd) GetType() int16

func (*PointerCmd) Marshal

func (e *PointerCmd) Marshal() (buf []byte, err error)

func (*PointerCmd) ReadFrom

func (e *PointerCmd) ReadFrom(r io.Reader) (n int64, err error)

func (*PointerCmd) SetReplayTxn added in v0.6.0

func (e *PointerCmd) SetReplayTxn(_ txnif.AsyncTxn)

func (*PointerCmd) String

func (e *PointerCmd) String() string

func (*PointerCmd) Unmarshal

func (e *PointerCmd) Unmarshal(buf []byte) error

func (*PointerCmd) VerboseString

func (e *PointerCmd) VerboseString() string

func (*PointerCmd) WriteTo

func (e *PointerCmd) WriteTo(w io.Writer) (n int64, err error)

type Txn

type Txn struct {
	*TxnCtx
	Mgr                      *TxnManager
	Store                    txnif.TxnStore
	Err                      error
	LSN                      uint64
	TenantID, UserID, RoleID atomic.Uint32

	PrepareCommitFn   func(txnif.AsyncTxn) error
	PrepareRollbackFn func(txnif.AsyncTxn) error
	ApplyCommitFn     func(txnif.AsyncTxn) error
	ApplyRollbackFn   func(txnif.AsyncTxn) error
	// contains filtered or unexported fields
}

func NewPersistedTxn added in v0.6.0

func NewPersistedTxn(
	mgr *TxnManager,
	ctx *TxnCtx,
	store txnif.TxnStore,
	lsn uint64,
	prepareCommitFn func(txnif.AsyncTxn) error,
	prepareRollbackFn func(txnif.AsyncTxn) error,
	applyCommitFn func(txnif.AsyncTxn) error,
	applyRollbackFn func(txnif.AsyncTxn) error) *Txn

func NewTxn

func NewTxn(mgr *TxnManager, store txnif.TxnStore, txnId []byte, start types.TS, info []byte) *Txn

func (*Txn) ApplyCommit

func (txn *Txn) ApplyCommit() (err error)

func (*Txn) ApplyRollback

func (txn *Txn) ApplyRollback() (err error)

func (*Txn) BindAccessInfo added in v0.6.0

func (txn *Txn) BindAccessInfo(tenantID, userID, roleID uint32)

func (*Txn) Commit

func (txn *Txn) Commit() (err error)

Commit is used to commit a 1PC or 2PC transaction running on Coordinator or running on Participant. Notice that the Commit of a 2PC transaction must be success once the Commit message arrives, since Preparing had already succeeded.

func (*Txn) CommitInRecovery added in v0.6.0

func (txn *Txn) CommitInRecovery() (err error)

CommitInRecovery is called during recovery

func (*Txn) Committing added in v0.6.0

func (txn *Txn) Committing() (err error)

Committing is used to record a "committing" status for coordinator. Notice that txn must commit successfully once committing message arrives, since Preparing had already succeeded.

func (*Txn) CommittingInRecovery added in v0.6.0

func (txn *Txn) CommittingInRecovery() (err error)

func (*Txn) CreateDatabase

func (txn *Txn) CreateDatabase(name, createSql string) (db handle.Database, err error)

func (*Txn) CreateDatabaseWithID added in v0.6.0

func (txn *Txn) CreateDatabaseWithID(name, createSql string, id uint64) (db handle.Database, err error)

func (*Txn) CurrentDatabase

func (txn *Txn) CurrentDatabase() (db handle.Database)

func (*Txn) DatabaseNames

func (txn *Txn) DatabaseNames() (names []string)

func (*Txn) DoneWithErr

func (txn *Txn) DoneWithErr(err error, isAbort bool)

func (*Txn) DropDatabase

func (txn *Txn) DropDatabase(name string) (db handle.Database, err error)

func (*Txn) DropDatabaseByID added in v0.6.0

func (txn *Txn) DropDatabaseByID(id uint64) (db handle.Database, err error)

func (*Txn) GetDatabase

func (txn *Txn) GetDatabase(name string) (db handle.Database, err error)

func (*Txn) GetDatabaseByID added in v0.6.0

func (txn *Txn) GetDatabaseByID(id uint64) (db handle.Database, err error)

func (*Txn) GetError

func (txn *Txn) GetError() error

func (*Txn) GetLSN

func (txn *Txn) GetLSN() uint64

func (*Txn) GetLsn added in v0.6.0

func (txn *Txn) GetLsn() uint64

func (*Txn) GetStore

func (txn *Txn) GetStore() txnif.TxnStore

func (*Txn) GetTenantID added in v0.6.0

func (txn *Txn) GetTenantID() uint32

func (*Txn) GetUserAndRoleID added in v0.6.0

func (txn *Txn) GetUserAndRoleID() (uint32, uint32)

func (*Txn) IsReplay added in v0.6.0

func (txn *Txn) IsReplay() bool

func (*Txn) LogTxnEntry

func (txn *Txn) LogTxnEntry(dbId, tableId uint64, entry txnif.TxnEntry, readed []*common.ID) (err error)

func (*Txn) LogTxnState added in v0.6.0

func (txn *Txn) LogTxnState(sync bool) (logEntry entry.Entry, err error)

func (*Txn) MarshalLogObject

func (txn *Txn) MarshalLogObject(enc zapcore.ObjectEncoder) (err error)

func (*Txn) MockIncWriteCnt

func (txn *Txn) MockIncWriteCnt() int

func (*Txn) PreApplyCommit

func (txn *Txn) PreApplyCommit() (err error)

func (*Txn) PrePrepare added in v0.6.0

func (txn *Txn) PrePrepare() error

func (*Txn) Prepare added in v0.6.0

func (txn *Txn) Prepare() (pts types.TS, err error)

Prepare is used to pre-commit a 2PC distributed transaction. Notice that once any error happened, we should rollback the txn. TODO:

  1. How to handle the case in which log service timed out?
  2. For a 2pc transaction, Rollback message may arrive before Prepare message, should handle this case by TxnStorage?

func (*Txn) PrepareCommit

func (txn *Txn) PrepareCommit() (err error)

func (*Txn) PrepareRollback

func (txn *Txn) PrepareRollback() (err error)

func (*Txn) Rollback

func (txn *Txn) Rollback() (err error)

Rollback is used to roll back a 1PC or 2PC transaction. Notice that there may be a such scenario in which a 2PC distributed transaction in ACTIVE will be rollbacked, since Rollback message may arrive before the Prepare message.

func (*Txn) SetApplyCommitFn

func (txn *Txn) SetApplyCommitFn(fn func(txnif.AsyncTxn) error)

func (*Txn) SetApplyRollbackFn

func (txn *Txn) SetApplyRollbackFn(fn func(txnif.AsyncTxn) error)

func (*Txn) SetError

func (txn *Txn) SetError(err error)

func (*Txn) SetPrepareCommitFn

func (txn *Txn) SetPrepareCommitFn(fn func(txnif.AsyncTxn) error)

func (*Txn) SetPrepareRollbackFn

func (txn *Txn) SetPrepareRollbackFn(fn func(txnif.AsyncTxn) error)

func (*Txn) String

func (txn *Txn) String() string

func (*Txn) UnsafeGetDatabase added in v0.6.0

func (txn *Txn) UnsafeGetDatabase(id uint64) (db handle.Database, err error)

func (*Txn) UnsafeGetRelation added in v0.6.0

func (txn *Txn) UnsafeGetRelation(dbId, id uint64) (db handle.Relation, err error)

func (*Txn) UseDatabase

func (txn *Txn) UseDatabase(name string) (err error)

func (*Txn) WaitDone

func (txn *Txn) WaitDone(err error, isAbort bool) error

func (*Txn) WaitPrepared added in v0.6.0

func (txn *Txn) WaitPrepared() error

type TxnBlock

type TxnBlock struct {
	Txn txnif.AsyncTxn
	Seg handle.Segment
}

func (*TxnBlock) Append

func (blk *TxnBlock) Append(*containers.Batch, uint32) (n uint32, err error)

func (*TxnBlock) BatchDedup

func (blk *TxnBlock) BatchDedup(containers.Vector) (err error)

func (*TxnBlock) Close

func (blk *TxnBlock) Close() error

func (*TxnBlock) Fingerprint

func (blk *TxnBlock) Fingerprint() *common.ID

func (*TxnBlock) GetByFilter

func (blk *TxnBlock) GetByFilter(*handle.Filter) (offset uint32, err error)

func (*TxnBlock) GetMeta

func (blk *TxnBlock) GetMeta() any

func (*TxnBlock) GetSegment

func (blk *TxnBlock) GetSegment() (seg handle.Segment)

func (*TxnBlock) GetTotalChanges

func (blk *TxnBlock) GetTotalChanges() int

func (*TxnBlock) ID

func (blk *TxnBlock) ID() uint64

func (*TxnBlock) IsAppendableBlock

func (blk *TxnBlock) IsAppendableBlock() bool

func (*TxnBlock) PushDeleteOp

func (blk *TxnBlock) PushDeleteOp(handle.Filter) (err error)

func (*TxnBlock) PushUpdateOp

func (blk *TxnBlock) PushUpdateOp(handle.Filter, string, any) (err error)

func (*TxnBlock) RangeDelete

func (blk *TxnBlock) RangeDelete(uint32, uint32, handle.DeleteType) (err error)

func (*TxnBlock) Rows

func (blk *TxnBlock) Rows() int

func (*TxnBlock) String

func (blk *TxnBlock) String() string

func (*TxnBlock) Update

func (blk *TxnBlock) Update(uint32, uint16, any) (err error)

type TxnCmd added in v0.6.0

type TxnCmd struct {
	*ComposedCmd
	*TxnCtx
	Txn txnif.AsyncTxn
}

func NewEmptyTxnCmd added in v0.6.0

func NewEmptyTxnCmd() *TxnCmd

func NewTxnCmd added in v0.6.0

func NewTxnCmd() *TxnCmd

func (*TxnCmd) ApplyCommit added in v0.6.0

func (c *TxnCmd) ApplyCommit()

func (*TxnCmd) ApplyRollback added in v0.6.0

func (c *TxnCmd) ApplyRollback()

func (*TxnCmd) Close added in v0.6.0

func (c *TxnCmd) Close()

func (*TxnCmd) Desc added in v0.6.0

func (c *TxnCmd) Desc() string

func (*TxnCmd) GetType added in v0.6.0

func (c *TxnCmd) GetType() int16

func (*TxnCmd) Marshal added in v0.6.0

func (c *TxnCmd) Marshal() (buf []byte, err error)

func (*TxnCmd) ReadFrom added in v0.6.0

func (c *TxnCmd) ReadFrom(r io.Reader) (n int64, err error)

func (*TxnCmd) SetReplayTxn added in v0.6.0

func (c *TxnCmd) SetReplayTxn(txn txnif.AsyncTxn)

func (*TxnCmd) SetTxn added in v0.6.0

func (c *TxnCmd) SetTxn(txn txnif.AsyncTxn)

func (*TxnCmd) String added in v0.6.0

func (c *TxnCmd) String() string

func (*TxnCmd) Unmarshal added in v0.6.0

func (c *TxnCmd) Unmarshal(buf []byte) (err error)

func (*TxnCmd) VerboseString added in v0.6.0

func (c *TxnCmd) VerboseString() string

func (*TxnCmd) WriteTo added in v0.6.0

func (c *TxnCmd) WriteTo(w io.Writer) (n int64, err error)

type TxnCommitListener added in v0.6.0

type TxnCommitListener interface {
	OnBeginPrePrepare(txnif.AsyncTxn)
	OnEndPrePrepare(txnif.AsyncTxn)
}

type TxnCtx

type TxnCtx struct {
	sync.RWMutex
	sync.WaitGroup
	DoneCond                     sync.Cond
	ID                           string
	IDCtx                        []byte
	StartTS, CommitTS, PrepareTS types.TS
	Info                         []byte
	State                        txnif.TxnState
	Participants                 []uint64

	// Memo is not thread-safe
	// It will be readonly when txn state is not txnif.TxnStateActive
	Memo *txnif.TxnMemo
}

func NewEmptyTxnCtx added in v0.6.0

func NewEmptyTxnCtx() *TxnCtx

func NewTxnCtx

func NewTxnCtx(id []byte, start types.TS, info []byte) *TxnCtx

func (*TxnCtx) CommitAfter added in v0.5.1

func (ctx *TxnCtx) CommitAfter(startTs types.TS) bool

func (*TxnCtx) CommitBefore added in v0.5.1

func (ctx *TxnCtx) CommitBefore(startTs types.TS) bool

func (*TxnCtx) GetCommitTS

func (ctx *TxnCtx) GetCommitTS() types.TS

func (*TxnCtx) GetCtx

func (ctx *TxnCtx) GetCtx() []byte

func (*TxnCtx) GetID

func (ctx *TxnCtx) GetID() string

func (*TxnCtx) GetInfo

func (ctx *TxnCtx) GetInfo() []byte

func (*TxnCtx) GetMemo added in v0.6.0

func (ctx *TxnCtx) GetMemo() *txnif.TxnMemo

func (*TxnCtx) GetParticipants added in v0.6.0

func (ctx *TxnCtx) GetParticipants() []uint64

func (*TxnCtx) GetPrepareTS added in v0.6.0

func (ctx *TxnCtx) GetPrepareTS() types.TS

func (*TxnCtx) GetStartTS

func (ctx *TxnCtx) GetStartTS() types.TS

func (*TxnCtx) GetTxnState added in v0.6.0

func (ctx *TxnCtx) GetTxnState(waitIfCommitting bool) (state txnif.TxnState)

False when atomically get the current txn state

True when the txn state is committing, wait it to be committed or rollbacked. It is used during snapshot reads. If TxnStateActive is currently returned, this value will definitely not be used, because even if it becomes TxnStatePreparing later, the timestamp would be larger than the current read timestamp.

func (*TxnCtx) Is2PC added in v0.6.0

func (ctx *TxnCtx) Is2PC() bool

func (*TxnCtx) IsActiveLocked

func (ctx *TxnCtx) IsActiveLocked() bool

func (*TxnCtx) IsReplay added in v0.6.0

func (ctx *TxnCtx) IsReplay() bool

func (*TxnCtx) IsVisible

func (ctx *TxnCtx) IsVisible(o txnif.TxnReader) bool

func (*TxnCtx) Repr

func (ctx *TxnCtx) Repr() string

func (*TxnCtx) SameTxn added in v0.5.1

func (ctx *TxnCtx) SameTxn(startTs types.TS) bool

func (*TxnCtx) SetCommitTS added in v0.6.0

func (ctx *TxnCtx) SetCommitTS(cts types.TS) (err error)

func (*TxnCtx) SetParticipants added in v0.6.0

func (ctx *TxnCtx) SetParticipants(ids []uint64) (err error)

func (*TxnCtx) String

func (ctx *TxnCtx) String() string

func (*TxnCtx) ToCommittedLocked

func (ctx *TxnCtx) ToCommittedLocked() error

func (*TxnCtx) ToCommittingFinished added in v0.6.0

func (ctx *TxnCtx) ToCommittingFinished() (err error)

func (*TxnCtx) ToCommittingFinishedLocked added in v0.6.0

func (ctx *TxnCtx) ToCommittingFinishedLocked() (err error)

func (*TxnCtx) ToPrepared added in v0.6.0

func (ctx *TxnCtx) ToPrepared() (err error)

func (*TxnCtx) ToPreparedLocked added in v0.6.0

func (ctx *TxnCtx) ToPreparedLocked() (err error)

func (*TxnCtx) ToPreparingLocked added in v0.6.0

func (ctx *TxnCtx) ToPreparingLocked(ts types.TS) error

func (*TxnCtx) ToRollbackedLocked

func (ctx *TxnCtx) ToRollbackedLocked() error

func (*TxnCtx) ToRollbacking added in v0.6.0

func (ctx *TxnCtx) ToRollbacking(ts types.TS) error

func (*TxnCtx) ToRollbackingLocked

func (ctx *TxnCtx) ToRollbackingLocked(ts types.TS) error

func (*TxnCtx) ToUnknownLocked

func (ctx *TxnCtx) ToUnknownLocked()

type TxnDatabase

type TxnDatabase struct {
	Txn txnif.AsyncTxn
}

func (*TxnDatabase) Close

func (db *TxnDatabase) Close() error

func (*TxnDatabase) CreateRelation

func (db *TxnDatabase) CreateRelation(def any) (rel handle.Relation, err error)

func (*TxnDatabase) DropRelationByName

func (db *TxnDatabase) DropRelationByName(name string) (rel handle.Relation, err error)

func (*TxnDatabase) GetID

func (db *TxnDatabase) GetID() uint64

func (*TxnDatabase) GetMeta

func (db *TxnDatabase) GetMeta() any

func (*TxnDatabase) GetName

func (db *TxnDatabase) GetName() string

func (*TxnDatabase) GetRelationByName

func (db *TxnDatabase) GetRelationByName(name string) (rel handle.Relation, err error)

func (*TxnDatabase) MakeRelationIt

func (db *TxnDatabase) MakeRelationIt() (it handle.RelationIt)

func (*TxnDatabase) RelationCnt

func (db *TxnDatabase) RelationCnt() int64

func (*TxnDatabase) Relations

func (db *TxnDatabase) Relations() (rels []handle.Relation)

func (*TxnDatabase) String

func (db *TxnDatabase) String() string

func (*TxnDatabase) UnsafeGetRelation added in v0.6.0

func (db *TxnDatabase) UnsafeGetRelation(id uint64) (rel handle.Relation, err error)

type TxnFactory

type TxnFactory = func(*TxnManager, txnif.TxnStore, []byte, types.TS, []byte) txnif.AsyncTxn

type TxnMVCCNode added in v0.6.0

type TxnMVCCNode struct {
	Start, Prepare, End types.TS
	Txn                 txnif.TxnReader
	Aborted             bool

	LogIndex *wal.Index
	// contains filtered or unexported fields
}

func NewTxnMVCCNodeWithTS added in v0.6.0

func NewTxnMVCCNodeWithTS(ts types.TS) *TxnMVCCNode

func NewTxnMVCCNodeWithTxn added in v0.6.0

func NewTxnMVCCNodeWithTxn(txn txnif.TxnReader) *TxnMVCCNode

func ReadTuple added in v0.6.0

func ReadTuple(bat *containers.Batch, row int) (un *TxnMVCCNode)

func (*TxnMVCCNode) AppendTuple added in v0.6.0

func (un *TxnMVCCNode) AppendTuple(bat *containers.Batch)

func (*TxnMVCCNode) ApplyCommit added in v0.6.0

func (un *TxnMVCCNode) ApplyCommit(index *wal.Index) (ts types.TS, err error)

func (*TxnMVCCNode) ApplyRollback added in v0.6.0

func (un *TxnMVCCNode) ApplyRollback(index *wal.Index) (ts types.TS, err error)

func (*TxnMVCCNode) CheckConflict added in v0.6.0

func (un *TxnMVCCNode) CheckConflict(ts types.TS) error

Check w-w confilct

func (*TxnMVCCNode) CloneAll added in v0.6.0

func (un *TxnMVCCNode) CloneAll() *TxnMVCCNode

func (*TxnMVCCNode) Close added in v0.7.0

func (un *TxnMVCCNode) Close()

func (*TxnMVCCNode) CommittedIn added in v0.6.0

func (un *TxnMVCCNode) CommittedIn(minTS, maxTS types.TS) (in, before bool)

in indicates whether this node is committed in between [minTs, maxTs] before indicates whether this node is committed before minTs NeedWaitCommitting should be called before to make sure all prepared active txns in between [minTs, maxTs] be committed or rollbacked

func (*TxnMVCCNode) Compare added in v0.6.0

func (un *TxnMVCCNode) Compare(o *TxnMVCCNode) int

func (*TxnMVCCNode) Compare2 added in v0.6.0

func (un *TxnMVCCNode) Compare2(o *TxnMVCCNode) int

func (*TxnMVCCNode) GetEnd added in v0.6.0

func (un *TxnMVCCNode) GetEnd() types.TS

func (*TxnMVCCNode) GetLogIndex added in v0.6.0

func (un *TxnMVCCNode) GetLogIndex() *wal.Index

func (*TxnMVCCNode) GetPrepare added in v0.6.0

func (un *TxnMVCCNode) GetPrepare() types.TS

func (*TxnMVCCNode) GetStart added in v0.6.0

func (un *TxnMVCCNode) GetStart() types.TS

func (*TxnMVCCNode) GetTxn added in v0.6.0

func (un *TxnMVCCNode) GetTxn() txnif.TxnReader

func (*TxnMVCCNode) Is1PC added in v0.6.0

func (un *TxnMVCCNode) Is1PC() bool

func (*TxnMVCCNode) IsAborted added in v0.6.0

func (un *TxnMVCCNode) IsAborted() bool

func (*TxnMVCCNode) IsActive added in v0.6.0

func (un *TxnMVCCNode) IsActive() bool

func (*TxnMVCCNode) IsCommitted added in v0.6.0

func (un *TxnMVCCNode) IsCommitted() bool

func (*TxnMVCCNode) IsCommitting added in v0.6.0

func (un *TxnMVCCNode) IsCommitting() bool

func (*TxnMVCCNode) IsSameTxn added in v0.6.0

func (un *TxnMVCCNode) IsSameTxn(ts types.TS) bool

func (*TxnMVCCNode) IsVisible added in v0.6.0

func (un *TxnMVCCNode) IsVisible(ts types.TS) (visible bool)

Check whether is mvcc node is visible to ts Make sure all the relevant prepared txns should be committed|rollbacked

func (*TxnMVCCNode) NeedWaitCommitting added in v0.6.0

func (un *TxnMVCCNode) NeedWaitCommitting(ts types.TS) (bool, txnif.TxnReader)

Check whether need to wait this mvcc node

func (*TxnMVCCNode) PrepareCommit added in v0.6.0

func (un *TxnMVCCNode) PrepareCommit() (ts types.TS, err error)

func (*TxnMVCCNode) PrepareRollback added in v0.6.0

func (un *TxnMVCCNode) PrepareRollback() (err error)

func (*TxnMVCCNode) PreparedIn added in v0.6.0

func (un *TxnMVCCNode) PreparedIn(minTS, maxTS types.TS) (in, before bool)

func (*TxnMVCCNode) ReadFrom added in v0.6.0

func (un *TxnMVCCNode) ReadFrom(r io.Reader) (n int64, err error)

func (*TxnMVCCNode) ReadTuple added in v0.6.0

func (un *TxnMVCCNode) ReadTuple(bat *containers.Batch, offset int)

func (*TxnMVCCNode) Set1PC added in v0.6.0

func (un *TxnMVCCNode) Set1PC()

func (*TxnMVCCNode) SetLogIndex added in v0.6.0

func (un *TxnMVCCNode) SetLogIndex(idx *wal.Index)

func (*TxnMVCCNode) String added in v0.6.0

func (un *TxnMVCCNode) String() string

func (*TxnMVCCNode) Update added in v0.6.0

func (un *TxnMVCCNode) Update(o *TxnMVCCNode)

func (*TxnMVCCNode) WriteTo added in v0.6.0

func (un *TxnMVCCNode) WriteTo(w io.Writer) (n int64, err error)

type TxnManager

type TxnManager struct {
	sync.RWMutex
	common.ClosedState
	PreparingSM     sm.StateMachine
	IDMap           map[string]txnif.AsyncTxn
	IdAlloc         *common.TxnIDAllocator
	TsAlloc         *types.TsAlloctor
	TxnStoreFactory TxnStoreFactory
	TxnFactory      TxnFactory
	Exception       *atomic.Value
	CommitListener  *batchTxnCommitListener
}

func NewTxnManager

func NewTxnManager(txnStoreFactory TxnStoreFactory, txnFactory TxnFactory, clock clock.Clock) *TxnManager

func (*TxnManager) DeleteTxn

func (mgr *TxnManager) DeleteTxn(id string) (err error)

func (*TxnManager) EnqueueFlushing added in v0.6.0

func (mgr *TxnManager) EnqueueFlushing(op any) (err error)

func (*TxnManager) GetOrCreateTxnWithMeta added in v0.6.0

func (mgr *TxnManager) GetOrCreateTxnWithMeta(
	info []byte,
	id []byte,
	ts types.TS) (txn txnif.AsyncTxn, err error)

GetOrCreateTxnWithMeta Get or create a txn initiated by CN

func (*TxnManager) GetTxn

func (mgr *TxnManager) GetTxn(id string) txnif.AsyncTxn

func (*TxnManager) GetTxnByCtx

func (mgr *TxnManager) GetTxnByCtx(ctx []byte) txnif.AsyncTxn

func (*TxnManager) Init

func (mgr *TxnManager) Init(prevTs types.TS) error

func (*TxnManager) MarshalLogObject

func (mgr *TxnManager) MarshalLogObject(enc zapcore.ObjectEncoder) (err error)

func (*TxnManager) MinTSForTest added in v0.7.0

func (mgr *TxnManager) MinTSForTest() types.TS

MinTSForTest is only be used in ut to ensure that files that have been gc will not be used.

func (*TxnManager) OnException

func (mgr *TxnManager) OnException(new error)

func (*TxnManager) OnOpTxn

func (mgr *TxnManager) OnOpTxn(op *OpTxn) (err error)

func (*TxnManager) OnReplayTxn added in v0.6.0

func (mgr *TxnManager) OnReplayTxn(txn txnif.AsyncTxn) (err error)

Note: Replay should always runs in a single thread

func (*TxnManager) Start added in v0.6.0

func (mgr *TxnManager) Start()

func (*TxnManager) StartTxn

func (mgr *TxnManager) StartTxn(info []byte) (txn txnif.AsyncTxn, err error)

StartTxn starts a local transaction initiated by DN

func (*TxnManager) StatMaxCommitTS added in v0.6.0

func (mgr *TxnManager) StatMaxCommitTS() (ts types.TS)

func (*TxnManager) Stop

func (mgr *TxnManager) Stop()

type TxnRelation

type TxnRelation struct {
	Txn txnif.AsyncTxn
	DB  handle.Database
}

func (*TxnRelation) AddBlksWithMetaLoc added in v0.7.0

func (rel *TxnRelation) AddBlksWithMetaLoc([]containers.Vector, string, []string, int32) error

func (*TxnRelation) Append

func (rel *TxnRelation) Append(data *containers.Batch) error

func (*TxnRelation) BatchDedup

func (rel *TxnRelation) BatchDedup(col containers.Vector) error

func (*TxnRelation) Close

func (rel *TxnRelation) Close() error

func (*TxnRelation) CreateNonAppendableSegment

func (rel *TxnRelation) CreateNonAppendableSegment(bool) (seg handle.Segment, err error)

func (*TxnRelation) CreateSegment

func (rel *TxnRelation) CreateSegment(bool) (seg handle.Segment, err error)

func (*TxnRelation) DeleteByFilter

func (rel *TxnRelation) DeleteByFilter(filter *handle.Filter) (err error)

func (*TxnRelation) DeleteByPhyAddrKey added in v0.6.0

func (rel *TxnRelation) DeleteByPhyAddrKey(any) (err error)

func (*TxnRelation) DeleteByPhyAddrKeys added in v0.6.0

func (rel *TxnRelation) DeleteByPhyAddrKeys(containers.Vector) (err error)

func (*TxnRelation) GetByFilter

func (rel *TxnRelation) GetByFilter(*handle.Filter) (id *common.ID, offset uint32, err error)

func (*TxnRelation) GetCardinality

func (rel *TxnRelation) GetCardinality(attr string) int64

func (*TxnRelation) GetMeta

func (rel *TxnRelation) GetMeta() any

func (*TxnRelation) GetSegment

func (rel *TxnRelation) GetSegment(id uint64) (seg handle.Segment, err error)

func (*TxnRelation) GetValue

func (rel *TxnRelation) GetValue(*common.ID, uint32, uint16) (v any, err error)

func (*TxnRelation) GetValueByFilter

func (rel *TxnRelation) GetValueByFilter(filter *handle.Filter, col int) (v any, err error)

func (*TxnRelation) GetValueByPhyAddrKey added in v0.6.0

func (rel *TxnRelation) GetValueByPhyAddrKey(any, int) (v any, err error)

func (*TxnRelation) ID

func (rel *TxnRelation) ID() uint64

func (*TxnRelation) LogTxnEntry

func (rel *TxnRelation) LogTxnEntry(entry txnif.TxnEntry, readed []*common.ID) (err error)

func (*TxnRelation) MakeBlockIt

func (rel *TxnRelation) MakeBlockIt() handle.BlockIt

func (*TxnRelation) MakeSegmentIt

func (rel *TxnRelation) MakeSegmentIt() handle.SegmentIt

func (*TxnRelation) RangeDelete

func (rel *TxnRelation) RangeDelete(*common.ID, uint32, uint32, handle.DeleteType) (err error)

func (*TxnRelation) Rows

func (rel *TxnRelation) Rows() int64

func (*TxnRelation) Schema

func (rel *TxnRelation) Schema() any

func (*TxnRelation) SimplePPString

func (rel *TxnRelation) SimplePPString(_ common.PPLevel) string

func (*TxnRelation) Size

func (rel *TxnRelation) Size(attr string) int64

func (*TxnRelation) SoftDeleteSegment

func (rel *TxnRelation) SoftDeleteSegment(id uint64) (err error)

func (*TxnRelation) String

func (rel *TxnRelation) String() string

func (*TxnRelation) Update

func (rel *TxnRelation) Update(*common.ID, uint32, uint16, any) (err error)

func (*TxnRelation) UpdateByFilter

func (rel *TxnRelation) UpdateByFilter(filter *handle.Filter, col uint16, v any) (err error)

func (*TxnRelation) UpdateConstraint added in v0.7.0

func (rel *TxnRelation) UpdateConstraint(cstr []byte) (err error)

type TxnSegment

type TxnSegment struct {
	Txn txnif.AsyncTxn
	Rel handle.Relation
}

func (*TxnSegment) BatchDedup

func (seg *TxnSegment) BatchDedup(containers.Vector) (err error)

func (*TxnSegment) Close

func (seg *TxnSegment) Close() error

func (*TxnSegment) CreateBlock

func (seg *TxnSegment) CreateBlock() (blk handle.Block, err error)

func (*TxnSegment) CreateNonAppendableBlock

func (seg *TxnSegment) CreateNonAppendableBlock() (blk handle.Block, err error)

func (*TxnSegment) GetBlock

func (seg *TxnSegment) GetBlock(id uint64) (blk handle.Block, err error)

func (*TxnSegment) GetID

func (seg *TxnSegment) GetID() uint64

func (*TxnSegment) GetMeta

func (seg *TxnSegment) GetMeta() any

func (*TxnSegment) GetRelation

func (seg *TxnSegment) GetRelation() (rel handle.Relation)

func (*TxnSegment) MakeBlockIt

func (seg *TxnSegment) MakeBlockIt() (it handle.BlockIt)

func (*TxnSegment) PushDeleteOp

func (seg *TxnSegment) PushDeleteOp(handle.Filter) (err error)

func (*TxnSegment) PushUpdateOp

func (seg *TxnSegment) PushUpdateOp(handle.Filter, string, any) (err error)

func (*TxnSegment) RangeDelete

func (seg *TxnSegment) RangeDelete(uint64, uint32, uint32, handle.DeleteType) (err error)

func (*TxnSegment) SoftDeleteBlock

func (seg *TxnSegment) SoftDeleteBlock(id uint64) (err error)

func (*TxnSegment) String

func (seg *TxnSegment) String() string

func (*TxnSegment) Update

func (seg *TxnSegment) Update(uint64, uint32, uint16, any) (err error)

type TxnState

type TxnState struct {
	// contains filtered or unexported fields
}

func (*TxnState) IsCommitted

func (ts *TxnState) IsCommitted() bool

func (*TxnState) IsRollbacked

func (ts *TxnState) IsRollbacked() bool

func (*TxnState) IsUncommitted

func (ts *TxnState) IsUncommitted() bool

func (*TxnState) ToCommitted

func (ts *TxnState) ToCommitted() error

func (*TxnState) ToCommitting

func (ts *TxnState) ToCommitting() error

func (*TxnState) ToRollbacked

func (ts *TxnState) ToRollbacked() error

func (*TxnState) ToRollbacking

func (ts *TxnState) ToRollbacking() error

type TxnStateCmd added in v0.6.0

type TxnStateCmd struct {
	ID       string
	State    txnif.TxnState
	CommitTs types.TS
}

func NewEmptyTxnStateCmd added in v0.6.0

func NewEmptyTxnStateCmd() *TxnStateCmd

func NewTxnStateCmd added in v0.6.0

func NewTxnStateCmd(id string, state txnif.TxnState, cts types.TS) *TxnStateCmd

func (*TxnStateCmd) ApplyCommit added in v0.6.0

func (c *TxnStateCmd) ApplyCommit()

func (*TxnStateCmd) ApplyRollback added in v0.6.0

func (c *TxnStateCmd) ApplyRollback()

func (*TxnStateCmd) Close added in v0.6.0

func (c *TxnStateCmd) Close()

func (*TxnStateCmd) Desc added in v0.6.0

func (c *TxnStateCmd) Desc() string

func (*TxnStateCmd) GetType added in v0.6.0

func (c *TxnStateCmd) GetType() int16

func (*TxnStateCmd) Marshal added in v0.6.0

func (c *TxnStateCmd) Marshal() (buf []byte, err error)

func (*TxnStateCmd) ReadFrom added in v0.6.0

func (c *TxnStateCmd) ReadFrom(r io.Reader) (n int64, err error)

func (*TxnStateCmd) SetReplayTxn added in v0.6.0

func (c *TxnStateCmd) SetReplayTxn(_ txnif.AsyncTxn)

func (*TxnStateCmd) String added in v0.6.0

func (c *TxnStateCmd) String() string

func (*TxnStateCmd) Unmarshal added in v0.6.0

func (c *TxnStateCmd) Unmarshal(buf []byte) (err error)

func (*TxnStateCmd) VerboseString added in v0.6.0

func (c *TxnStateCmd) VerboseString() string

func (*TxnStateCmd) WriteTo added in v0.6.0

func (c *TxnStateCmd) WriteTo(w io.Writer) (n int64, err error)

type TxnStoreFactory

type TxnStoreFactory = func() txnif.TxnStore

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL