txn

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

a pure interface version

Index

Constants

View Source
const (
	EMPTY         string = ""
	RETRYINTERVAL        = 10 * time.Millisecond
)

Variables

View Source
var (
	KeyNotFound      = errors.Errorf("key not found")
	DirtyRead        = errors.Errorf("dirty read")
	DeserializeError = errors.Errorf("deserialize error")
	VersionMismatch  = errors.Errorf("version mismatch")
	KeyExists        = errors.Errorf("key exists")
	ReadFailed       = errors.Errorf("read failed due to unknown txn status")
)

Functions

This section is empty.

Types

type CommitInfo

type CommitInfo struct {
	Key     string
	Version string
}

type Connector

type Connector interface {
	Connect() error
	GetItem(key string) (DataItem, error)
	PutItem(key string, value DataItem) (string, error)
	ConditionalUpdate(key string,
		value DataItem, doCreate bool) (string, error)
	ConditionalCommit(key string, version string) (string, error)
	Get(name string) (string, error)
	Put(name string, value any) error
	Delete(name string) error
	AtomicCreate(name string, value any) (string, error)
}

type DataItem

type DataItem interface {
	TxnItem
	Key() string

	Value() string
	SetValue(string)

	Prev() string
	SetPrev(string)

	LinkedLen() int
	SetLinkedLen(int)

	IsDeleted() bool
	SetIsDeleted(bool)

	Equal(other DataItem) bool
	Empty() bool
}

type DataItem2

type DataItem2 struct {
	Key       string       `redis:"Key" bson:"_id"`
	Value     string       `redis:"Value" bson:"Value"`
	TxnId     string       `redis:"TxnId" bson:"TxnId"`
	TxnState  config.State `redis:"TxnState" bson:"TxnState"`
	TValid    time.Time    `redis:"TValid" bson:"TValid"`
	TLease    time.Time    `redis:"TLease" bson:"TLease"`
	Prev      string       `redis:"Prev" bson:"Prev"`
	LinkedLen int          `redis:"LinkedLen" bson:"LinkedLen"`
	IsDeleted bool         `redis:"IsDeleted" bson:"IsDeleted"`
	Version   int          `redis:"Version" bson:"Version"`
}

func (*DataItem2) Equal

func (r *DataItem2) Equal(other DataItem2) bool

func (DataItem2) GetKey

func (m DataItem2) GetKey() string

func (DataItem2) MarshalBSONValue

func (mi DataItem2) MarshalBSONValue() (bsontype.Type, []byte, error)

func (DataItem2) MarshalBinary

func (r DataItem2) MarshalBinary() (data []byte, err error)

func (DataItem2) String

func (r DataItem2) String() string

func (*DataItem2) UnmarshalBSONValue

func (mi *DataItem2) UnmarshalBSONValue(t bsontype.Type, raw []byte) error

type DataItemFactory

type DataItemFactory interface {
	NewDataItem(ItemOptions) DataItem
}

type Datastore

type Datastore struct {

	// Name is the name of the datastore.
	Name string

	// Txn is the current transaction.
	Txn *Transaction
	// contains filtered or unexported fields
}

Datastore represents a datastorer implementation using the underlying connector.

func NewDatastore

func NewDatastore(name string, conn Connector, factory DataItemFactory) *Datastore

NewDatastore creates a new instance of Datastore with the given name and connection. It initializes the read and write caches, as well as the serializer.

func (*Datastore) Abort

func (r *Datastore) Abort(hasCommitted bool) error

Abort discards the changes made in the current transaction.

  • If hasCommitted is false, it clears the write cache.
  • If hasCommitted is true, it rolls back the changes made by the current transaction.

It returns an error if there is any issue during the rollback process.

func (*Datastore) Commit

func (r *Datastore) Commit() error

Commit updates the state of records in the data store to COMMITTED.

It iterates over the write cache and updates each record's state to COMMITTED.

After updating the records, it clears the write cache. Returns an error if there is any issue updating the records.

func (*Datastore) Copy

func (r *Datastore) Copy() Datastorer

Copy returns a new instance of Datastore with the same name and connection. It is used to create a copy of the Datastore object.

func (*Datastore) CreateTSR

func (r *Datastore) CreateTSR(txnId string, txnState config.State) (config.State, error)

CreateTSR creates a new transaction state record in the Datastore. If the transaction ID already exists in the Datastore, it returns the old state and an error. If an error occurs during the creation process, it returns -1 and the error. Otherwise, it returns -1 and nil.

func (*Datastore) Delete

func (r *Datastore) Delete(key string) error

Delete deletes a record from the Datastore. It will return an error if the record is not found.

func (*Datastore) DeleteTSR

func (r *Datastore) DeleteTSR(txnId string) error

DeleteTSR deletes a transaction with the given transaction ID from the Redis datastore. It returns an error if the deletion operation fails.

func (*Datastore) GetName

func (r *Datastore) GetName() string

GetName returns the name of the Datastore.

func (*Datastore) OnePhaseCommit

func (r *Datastore) OnePhaseCommit() error

func (*Datastore) Prepare

func (r *Datastore) Prepare() error

Prepare prepares the Datastore for commit.

func (*Datastore) Read

func (r *Datastore) Read(key string, value any) error

Read reads a record from the Datastore.

func (*Datastore) ReadTSR

func (r *Datastore) ReadTSR(txnId string) (config.State, error)

ReadTSR reads the transaction state from the Redis datastore. It takes a transaction ID as input and returns the corresponding state and an error, if any.

func (*Datastore) SetSerializer

func (r *Datastore) SetSerializer(se serializer.Serializer)

SetSerializer sets the serializer for the Datastore. The serializer is used to serialize and deserialize data when storing and retrieving it from Redis.

func (*Datastore) SetTxn

func (r *Datastore) SetTxn(txn *Transaction)

SetTxn sets the transaction for the MemoryDatastore. It takes a pointer to a Transaction as input and assigns it to the Txn field of the MemoryDatastore.

func (*Datastore) Start

func (r *Datastore) Start() error

Start starts the Datastore by establishing a connection to the underlying server. It returns an error if the connection fails.

func (*Datastore) Write

func (r *Datastore) Write(key string, value any) error

Write writes a record to the cache. It will serialize the value using the Datastore's serializer.

func (*Datastore) WriteTSR

func (r *Datastore) WriteTSR(txnId string, txnState config.State) error

WriteTSR writes the transaction state (txnState) associated with the given transaction ID (txnId) to the Redis datastore. It returns an error if the write operation fails.

type Datastorer

type Datastorer interface {
	// Start starts a transaction, including initializing the connection.
	Start() error

	// Read reads a record from the data store. If the record is not in the cache (readCache/writeCache),
	// it reads the record from the connection and puts it into the cache.
	Read(key string, value any) error

	// Write writes records into the writeCache.
	Write(key string, value any) error

	// Delete marks a record as deleted.
	Delete(key string) error

	// Prepare executes the prepare phase of transaction commit.
	// It first marks the records in the writeCache with T_commit, TxnId, and TxnState,
	// then it performs `conditionalUpdate` in a global order.
	Prepare() error

	// Commit executes the commit phase of transaction commit.
	// It updates the records in the writeCache to the COMMITTED state
	// in the data store.
	Commit() error

	// Abort aborts the transaction.
	// It rolls back the records in the writeCache to the state before the transaction.
	Abort(hasCommitted bool) error

	// OnePhaseCommit executes the one-phase commit protocol.
	OnePhaseCommit() error

	// GetName returns the name of the data store.
	GetName() string

	// SetTxn sets the current transaction for the data store.
	SetTxn(txn *Transaction)

	Copy() Datastorer
}

Datastorer is an interface that defines the operations for interacting with a data store.

type ItemOptions

type ItemOptions struct {
	Key       string
	Value     string
	TxnId     string
	TxnState  config.State
	TValid    time.Time
	TLease    time.Time
	Prev      string
	LinkedLen int
	IsDeleted bool
	Version   string
}

type ItemType

type ItemType string
const (
	NoneItem  ItemType = ""
	RedisItem ItemType = "redis"
	MongoItem ItemType = "mongo"
)

type NetworkItem

type NetworkItem struct {
	Item     DataItem
	DoCreate bool
}

type PredicateInfo

type PredicateInfo struct {
	State     config.State
	ItemKey   string
	LeaseTime time.Time
}

type RecordConfig

type RecordConfig struct {
	GlobalName                  string
	MaxRecordLen                int
	ReadStrategy                config.ReadStrategy
	ConcurrentOptimizationLevel int
}

type RemoteClient

type RemoteClient interface {
	Read(dsName string, key string, ts time.Time, config RecordConfig) (DataItem, RemoteDataStrategy, error)
	Prepare(dsName string, itemList []DataItem,
		startTime time.Time, commitTime time.Time,
		config RecordConfig, validationMap map[string]PredicateInfo) (map[string]string, error)
	Commit(dsName string, infoList []CommitInfo) error
	Abort(dsName string, keyList []string, txnId string) error
}

type RemoteDataStrategy

type RemoteDataStrategy string
const (
	Normal       RemoteDataStrategy = "Normal"
	AssumeAbort  RemoteDataStrategy = "AssumeAbort"
	AssumeCommit RemoteDataStrategy = "AssumeCommit"
)

type SourceType

type SourceType string
const (
	// EMPTY  SourceType = "EMPTY"
	LOCAL  SourceType = "LOCAL"
	GLOBAL SourceType = "GLOBAL"
)

type StateMachine

type StateMachine struct {
	// contains filtered or unexported fields
}

func NewStateMachine

func NewStateMachine() *StateMachine

func (*StateMachine) CheckState

func (st *StateMachine) CheckState(state config.State) error

func (*StateMachine) GetState

func (st *StateMachine) GetState() config.State

func (*StateMachine) SetState

func (st *StateMachine) SetState(state config.State) error

type TSRMaintainer

type TSRMaintainer interface {
	// ReadTSR reads the transaction state record (TSR) for a transaction.
	ReadTSR(txnId string) (config.State, error)

	// WriteTSR writes the transaction state record (TSR) for a transaction.
	WriteTSR(txnId string, txnState config.State) error

	// CreateTSR atomically create the transaction state record (TSR) for a transaction.
	// if the TSR already exists, it will return an error.
	CreateTSR(txnId string, txnState config.State) (config.State, error)

	// DeleteTSR deletes the transaction state record (TSR) for a transaction.
	DeleteTSR(txnId string) error
}

type Transaction

type Transaction struct {
	// TxnId is the unique identifier for the transaction.
	TxnId string
	// TxnStartTime is the timestamp when the transaction started.
	TxnStartTime time.Time
	// TxnCommitTime is the timestamp when the transaction was committed.
	TxnCommitTime time.Time

	*StateMachine
	// contains filtered or unexported fields
}

Transaction represents a transaction in the system. It contains information such as the transaction ID, state, timestamps, datastores, time source, oracle URL, and locker.

func NewTransaction

func NewTransaction() *Transaction

NewTransaction creates a new Transaction object. It initializes the Transaction with default values and returns a pointer to the newly created object.

func NewTransactionWithRemote

func NewTransactionWithRemote(client RemoteClient) *Transaction

func (*Transaction) Abort

func (t *Transaction) Abort() error

Abort aborts the transaction. It checks the current state of the transaction and returns an error if the transaction is already committed, aborted, or not started. If the transaction is in a valid state, it sets the transaction state to ABORTED and calls the Abort method on each data store associated with the transaction. Returns an error if any of the data store's Abort method returns an error, otherwise returns nil.

func (*Transaction) AddDatastore

func (t *Transaction) AddDatastore(ds Datastorer) error

AddDatastore adds a datastore to the transaction. It checks if the datastore name is duplicated and returns an error if it is. Otherwise, it sets the transaction for the datastore and adds it to the transaction's datastore map.

func (*Transaction) AddDatastores

func (t *Transaction) AddDatastores(dss ...Datastorer) error

AddDatastores adds multiple datastores to the transaction. It takes a variadic parameter `dss` of type `Datastorer` which represents the datastores to be added. It returns an error if any datastore fails to be added, otherwise it returns nil.

func (*Transaction) Commit

func (t *Transaction) Commit() error

Commit commits the transaction. It checks the transaction state and performs the prepare phase. If the prepare phase fails, it aborts the transaction and returns an error. Otherwise, it proceeds to the commit phase and commits the transaction in all data stores. Finally, it deletes the transaction state record. Returns an error if any operation fails.

func (*Transaction) CreateTSR

func (t *Transaction) CreateTSR(txnId string, txnState config.State) (config.State, error)

SetTSR writes the Transaction State Record (TSR) for the given transaction ID and state. It uses the global data store to persist the TSR. The txnId parameter specifies the ID of the transaction. The txnState parameter specifies the state of the transaction. Returns an error if there was a problem writing the TSR.

func (*Transaction) Delete

func (t *Transaction) Delete(dsName string, key string) error

Delete deletes a key from the specified datastore in the transaction. It returns an error if the transaction is not in the STARTED state or if the datastore is not found.

func (*Transaction) DeleteTSR

func (t *Transaction) DeleteTSR() error

DeleteTSR deletes the Transaction Status Record (TSR) associated with the Transaction. It calls the DeleteTSR method of the tsrMaintainer to perform the deletion. It returns an error if the deletion operation fails.

func (*Transaction) GetTSRState

func (t *Transaction) GetTSRState(txnId string) (config.State, error)

func (*Transaction) Lock

func (t *Transaction) Lock(key string, id string, duration time.Duration) error

Lock locks the specified key with the given ID for the specified duration. If the locker is not set, it returns an error.

func (*Transaction) OnePhaseCommit

func (t *Transaction) OnePhaseCommit() error

func (*Transaction) Read

func (t *Transaction) Read(dsName string, key string, value any) error

Read reads the value associated with the given key from the specified datastore. It returns an error if the transaction is not in the STARTED state or if the datastore is not found.

func (*Transaction) RemoteAbort

func (t *Transaction) RemoteAbort(dsName string, keyList []string) error

func (*Transaction) RemoteCommit

func (t *Transaction) RemoteCommit(dsName string, infoList []CommitInfo) error

func (*Transaction) RemotePrepare

func (t *Transaction) RemotePrepare(dsName string, itemList []DataItem, validationMap map[string]PredicateInfo) (map[string]string, error)

func (*Transaction) RemoteRead

func (t *Transaction) RemoteRead(dsName string, key string) (DataItem, RemoteDataStrategy, error)

func (*Transaction) RemoteValidate

func (t *Transaction) RemoteValidate(dsName string, key string, item DataItem) error

func (*Transaction) SetGlobalDatastore

func (t *Transaction) SetGlobalDatastore(ds Datastorer)

SetGlobalDatastore sets the global datastore for the transaction. It takes a Datastore parameter and assigns it to the globalDataStore field of the Transaction struct.

func (*Transaction) SetGlobalTimeSource

func (t *Transaction) SetGlobalTimeSource(url string)

SetGlobalTimeSource sets the global time source for the transaction. It takes a URL as a parameter and assigns it to the transaction's oracleURL field. The timeSource field is set to GLOBAL.

func (*Transaction) SetLocker

func (t *Transaction) SetLocker(locker locker.Locker)

SetLocker sets the locker for the transaction. The locker is responsible for managing the concurrency of the transaction. It ensures that only one goroutine can access the transaction at a time. The locker must implement the locker.Locker interface.

func (*Transaction) Start

func (t *Transaction) Start() error

Start begins the transaction. It checks if the transaction is already started and returns an error if so. It also checks if the necessary datastores are added and returns an error if not. It sets the transaction state to STARTED and generates a unique transaction ID. It starts each datastore associated with the transaction. Returns an error if any of the above steps fail, otherwise returns nil.

func (*Transaction) Unlock

func (t *Transaction) Unlock(key string, id string) error

Unlock unlocks the specified key with the given ID. It returns an error if the locker is not set or if unlocking fails.

func (*Transaction) Write

func (t *Transaction) Write(dsName string, key string, value any) error

Write writes the given key-value pair to the specified datastore in the transaction. It returns an error if the transaction is not in the STARTED state or if the datastore is not found.

func (*Transaction) WriteTSR

func (t *Transaction) WriteTSR(txnId string, txnState config.State) error

type TxnError

type TxnError string

func (TxnError) Error

func (e TxnError) Error() string

type TxnItem

type TxnItem interface {
	TxnId() string

	TxnState() config.State
	SetTxnState(config.State)

	TValid() time.Time
	SetTValid(time.Time)

	TLease() time.Time
	SetTLease(time.Time)

	Version() string
	SetVersion(string)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL