Documentation ¶
Index ¶
- type Connector
- type DataItem
- func (r *DataItem) Equal(other DataItem) bool
- func (m DataItem) GetKey() string
- func (mi DataItem) MarshalBSONValue() (bsontype.Type, []byte, error)
- func (r DataItem) MarshalBinary() (data []byte, err error)
- func (r DataItem) String() string
- func (mi *DataItem) UnmarshalBSONValue(t bsontype.Type, raw []byte) error
- type Datastore
- func (r *Datastore) Abort(hasCommitted bool) error
- func (r *Datastore) Commit() error
- func (r *Datastore) Copy() Datastorer
- func (r *Datastore) Delete(key string) error
- func (r *Datastore) DeleteTSR(txnId string) error
- func (r *Datastore) GetName() string
- func (r *Datastore) Prepare() error
- func (r *Datastore) Read(key string, value any) error
- func (r *Datastore) ReadTSR(txnId string) (config.State, error)
- func (r *Datastore) SetSerializer(se serializer.Serializer)
- func (r *Datastore) SetTxn(txn *Transaction)
- func (r *Datastore) Start() error
- func (r *Datastore) Write(key string, value any) error
- func (r *Datastore) WriteTSR(txnId string, txnState config.State) error
- type Datastorer
- type SourceType
- type StateMachine
- type TSRMaintainer
- type Transaction
- func (t *Transaction) Abort() error
- func (t *Transaction) AddDatastore(ds Datastorer) error
- func (t *Transaction) Commit() error
- func (t *Transaction) Delete(dsName string, key string) error
- func (t *Transaction) DeleteTSR() error
- func (t *Transaction) GetTSRState(txnId string) (config.State, error)
- func (t *Transaction) Lock(key string, id string, duration time.Duration) error
- func (t *Transaction) Read(dsName string, key string, value any) error
- func (t *Transaction) SetGlobalDatastore(ds Datastorer)
- func (t *Transaction) SetGlobalTimeSource(url string)
- func (t *Transaction) SetLocker(locker locker.Locker)
- func (t *Transaction) Start() error
- func (t *Transaction) Unlock(key string, id string) error
- func (t *Transaction) Write(dsName string, key string, value any) error
- func (t *Transaction) WriteTSR(txnId string, txnState config.State) error
- type TxnError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataItem ¶ added in v0.2.0
type DataItem struct { Key string `redis:"Key" bson:"_id"` Value string `redis:"Value" bson:"Value"` TxnId string `redis:"TxnId" bson:"TxnId"` TxnState config.State `redis:"TxnState" bson:"TxnState"` TValid time.Time `redis:"TValid" bson:"TValid"` TLease time.Time `redis:"TLease" bson:"TLease"` Prev string `redis:"Prev" bson:"Prev"` LinkedLen int `redis:"LinkedLen" bson:"LinkedLen"` IsDeleted bool `redis:"IsDeleted" bson:"IsDeleted"` Version int `redis:"Version" bson:"Version"` }
func (DataItem) MarshalBSONValue ¶ added in v0.2.0
func (DataItem) MarshalBinary ¶ added in v0.2.0
type Datastore ¶
type Datastore struct { // Name is the name of the datastore. Name string // Txn is the current transaction. Txn *Transaction // contains filtered or unexported fields }
Datastore represents a datastorer implementation using the underlying connector.
func NewDatastore ¶ added in v0.2.0
NewDatastore creates a new instance of Datastore with the given name and connection. It initializes the read and write caches, as well as the serializer.
func (*Datastore) Abort ¶
Abort discards the changes made in the current transaction. If hasCommitted is false, it clears the write cache. If hasCommitted is true, it rolls back the changes made by the current transaction. It returns an error if there is any issue during the rollback process.
func (*Datastore) Commit ¶
Commit updates the state of records in the data store to COMMITTED. It iterates over the write cache and updates each record's state to COMMITTED. After updating the records, it clears the write cache. Returns an error if there is any issue updating the records.
func (*Datastore) Copy ¶
func (r *Datastore) Copy() Datastorer
Copy returns a new instance of Datastore with the same name and connection. It is used to create a copy of the Datastore object.
func (*Datastore) Delete ¶
Delete deletes a record from the Datastore. It will return an error if the record is not found.
func (*Datastore) DeleteTSR ¶ added in v0.2.0
DeleteTSR deletes a transaction with the given transaction ID from the Redis datastore. It returns an error if the deletion operation fails.
func (*Datastore) ReadTSR ¶ added in v0.2.0
ReadTSR reads the transaction state from the Redis datastore. It takes a transaction ID as input and returns the corresponding state and an error, if any.
func (*Datastore) SetSerializer ¶ added in v0.2.0
func (r *Datastore) SetSerializer(se serializer.Serializer)
SetSerializer sets the serializer for the Datastore. The serializer is used to serialize and deserialize data when storing and retrieving it from Redis.
func (*Datastore) SetTxn ¶
func (r *Datastore) SetTxn(txn *Transaction)
SetTxn sets the transaction for the MemoryDatastore. It takes a pointer to a Transaction as input and assigns it to the Txn field of the MemoryDatastore.
func (*Datastore) Start ¶
Start starts the Datastore by establishing a connection to the underlying server. It returns an error if the connection fails.
type Datastorer ¶ added in v0.2.0
type Datastorer interface { // Start starts a transaction, including initializing the connection. Start() error // Read reads a record from the data store. If the record is not in the cache (readCache/writeCache), // it reads the record from the connection and puts it into the cache. Read(key string, value any) error // Write writes records into the writeCache. Write(key string, value any) error // Delete marks a record as deleted. Delete(key string) error // Prepare executes the prepare phase of transaction commit. // It first marks the records in the writeCache with T_commit, TxnId, and TxnState, // then it performs `conditionalUpdate` in a global order. Prepare() error // Commit executes the commit phase of transaction commit. // It updates the records in the writeCache to the COMMITTED state // in the data store. Commit() error // Abort aborts the transaction. // It rolls back the records in the writeCache to the state before the transaction. Abort(hasCommitted bool) error // GetName returns the name of the data store. GetName() string // SetTxn sets the current transaction for the data store. SetTxn(txn *Transaction) Copy() Datastorer }
Datastore is an interface that defines the operations for interacting with a data store.
type SourceType ¶
type SourceType string
const ( // EMPTY SourceType = "EMPTY" LOCAL SourceType = "LOCAL" GLOBAL SourceType = "GLOBAL" )
type StateMachine ¶
type StateMachine struct {
// contains filtered or unexported fields
}
func NewStateMachine ¶
func NewStateMachine() *StateMachine
func (*StateMachine) CheckState ¶
func (st *StateMachine) CheckState(state config.State) error
func (*StateMachine) GetState ¶
func (st *StateMachine) GetState() config.State
type TSRMaintainer ¶
type TSRMaintainer interface { // ReadTSR reads the transaction state record (TSR) for a transaction. ReadTSR(txnId string) (config.State, error) // WriteTSR writes the transaction state record (TSR) for a transaction. WriteTSR(txnId string, txnState config.State) error // DeleteTSR deletes the transaction state record (TSR) for a transaction. DeleteTSR(txnId string) error }
type Transaction ¶
type Transaction struct { // TxnId is the unique identifier for the transaction. TxnId string // TxnStartTime is the timestamp when the transaction started. TxnStartTime time.Time // TxnCommitTime is the timestamp when the transaction was committed. TxnCommitTime time.Time *StateMachine // contains filtered or unexported fields }
Transaction represents a transaction in the system. It contains information such as the transaction ID, state, timestamps, datastores, time source, oracle URL, and locker.
func NewTransaction ¶
func NewTransaction() *Transaction
NewTransaction creates a new Transaction object. It initializes the Transaction with default values and returns a pointer to the newly created object.
func (*Transaction) Abort ¶
func (t *Transaction) Abort() error
Abort aborts the transaction. It checks the current state of the transaction and returns an error if the transaction is already committed, aborted, or not started. If the transaction is in a valid state, it sets the transaction state to ABORTED and calls the Abort method on each data store associated with the transaction. Returns an error if any of the data store's Abort method returns an error, otherwise returns nil.
func (*Transaction) AddDatastore ¶
func (t *Transaction) AddDatastore(ds Datastorer) error
AddDatastore adds a datastore to the transaction. It checks if the datastore name is duplicated and returns an error if it is. Otherwise, it sets the transaction for the datastore and adds it to the transaction's datastore map.
func (*Transaction) Commit ¶
func (t *Transaction) Commit() error
Commit commits the transaction. It checks the transaction state and performs the prepare phase. If the prepare phase fails, it aborts the transaction and returns an error. Otherwise, it proceeds to the commit phase and commits the transaction in all data stores. Finally, it deletes the transaction state record. Returns an error if any operation fails.
func (*Transaction) Delete ¶
func (t *Transaction) Delete(dsName string, key string) error
Delete deletes a key from the specified datastore in the transaction. It returns an error if the transaction is not in the STARTED state or if the datastore is not found.
func (*Transaction) DeleteTSR ¶
func (t *Transaction) DeleteTSR() error
DeleteTSR deletes the Transaction Status Record (TSR) associated with the Transaction. It calls the DeleteTSR method of the globalDataStore to perform the deletion. It returns an error if the deletion operation fails.
func (*Transaction) GetTSRState ¶
func (t *Transaction) GetTSRState(txnId string) (config.State, error)
func (*Transaction) Lock ¶
Lock locks the specified key with the given ID for the specified duration. If the locker is not set, it returns an error.
func (*Transaction) Read ¶
func (t *Transaction) Read(dsName string, key string, value any) error
Read reads the value associated with the given key from the specified datastore. It returns an error if the transaction is not in the STARTED state or if the datastore is not found.
func (*Transaction) SetGlobalDatastore ¶
func (t *Transaction) SetGlobalDatastore(ds Datastorer)
SetGlobalDatastore sets the global datastore for the transaction. It takes a Datastore parameter and assigns it to the globalDataStore field of the Transaction struct.
func (*Transaction) SetGlobalTimeSource ¶
func (t *Transaction) SetGlobalTimeSource(url string)
SetGlobalTimeSource sets the global time source for the transaction. It takes a URL as a parameter and assigns it to the transaction's oracleURL field. The timeSource field is set to GLOBAL.
func (*Transaction) SetLocker ¶
func (t *Transaction) SetLocker(locker locker.Locker)
SetLocker sets the locker for the transaction. The locker is responsible for managing the concurrency of the transaction. It ensures that only one goroutine can access the transaction at a time. The locker must implement the locker.Locker interface.
func (*Transaction) Start ¶
func (t *Transaction) Start() error
Start begins the transaction. It checks if the transaction is already started and returns an error if so. It also checks if the necessary datastores are added and returns an error if not. It sets the transaction state to STARTED and generates a unique transaction ID. It starts each datastore associated with the transaction. Returns an error if any of the above steps fail, otherwise returns nil.
func (*Transaction) Unlock ¶
func (t *Transaction) Unlock(key string, id string) error
Unlock unlocks the specified key with the given ID. It returns an error if the locker is not set or if unlocking fails.
func (*Transaction) Write ¶
func (t *Transaction) Write(dsName string, key string, value any) error
Write writes the given key-value pair to the specified datastore in the transaction. It returns an error if the transaction is not in the STARTED state or if the datastore is not found.
func (*Transaction) WriteTSR ¶
func (t *Transaction) WriteTSR(txnId string, txnState config.State) error
WriteTSR writes the Transaction State Record (TSR) for the given transaction ID and state. It uses the global data store to persist the TSR. The txnId parameter specifies the ID of the transaction. The txnState parameter specifies the state of the transaction. Returns an error if there was a problem writing the TSR.