holder

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2020 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueryGlobalTransactionDOByXid = `` /* 194-byte string literal not displayed */

	QueryGlobalTransactionDOByTransactionId = `` /* 205-byte string literal not displayed */

	InsertGlobalTransactionDO = `` /* 236-byte string literal not displayed */

	UpdateGlobalTransactionDO     = "update global_table set status = ?, gmt_modified = now() where xid = ?"
	DeleteGlobalTransactionDO     = "delete from global_table where xid = ?"
	QueryBranchTransactionDOByXid = `` /* 206-byte string literal not displayed */

	InsertBranchTransactionDO = `` /* 223-byte string literal not displayed */

	UpdateBranchTransactionDO = "update branch_table set status = ?, gmt_modified = now(6) where xid = ? and branch_id = ?"
	DeleteBranchTransactionDO = "delete from branch_table where xid = ? and branch_id = ?"
	QueryMaxTransactionId     = "select max(transaction_id) as maxTransactioId from global_table where transaction_id < ? and transaction_id > ?"
	QueryMaxBranchId          = "select max(branch_id) as maxBranchId from branch_table where branch_id < ? and branch_id > ?"
)

Variables

View Source
var (
	ASYNC_COMMITTING_SESSION_MANAGER_NAME  = "async.commit.data"
	RETRY_COMMITTING_SESSION_MANAGER_NAME  = "retry.commit.data"
	RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data"
)
View Source
var FileTrxNum int64 = 0
View Source
var HisDataFilenamePostfix = ".1"
View Source
var MaxTrxTimeoutMills int64 = 30 * 60 * 1000
View Source
var PerFileBlockSize int64 = 65535 * 8

Functions

func Init

func Init()

Types

type AbstractSessionManager

type AbstractSessionManager struct {
	TransactionStoreManager TransactionStoreManager
	Name                    string
}

func (*AbstractSessionManager) AddBranchSession

func (sessionManager *AbstractSessionManager) AddBranchSession(globalSession *session.GlobalSession, session *session.BranchSession) error

func (*AbstractSessionManager) AddGlobalSession

func (sessionManager *AbstractSessionManager) AddGlobalSession(session *session.GlobalSession) error

func (*AbstractSessionManager) RemoveBranchSession

func (sessionManager *AbstractSessionManager) RemoveBranchSession(globalSession *session.GlobalSession, session *session.BranchSession) error

func (*AbstractSessionManager) RemoveGlobalSession

func (sessionManager *AbstractSessionManager) RemoveGlobalSession(session *session.GlobalSession) error

func (*AbstractSessionManager) UpdateBranchSessionStatus

func (sessionManager *AbstractSessionManager) UpdateBranchSessionStatus(session *session.BranchSession, status meta.BranchStatus) error

func (*AbstractSessionManager) UpdateGlobalSessionStatus

func (sessionManager *AbstractSessionManager) UpdateGlobalSessionStatus(session *session.GlobalSession, status meta.GlobalStatus) error

type AbstractTransactionStoreManager

type AbstractTransactionStoreManager struct {
}

func (*AbstractTransactionStoreManager) GetCurrentMaxSessionId

func (transactionStoreManager *AbstractTransactionStoreManager) GetCurrentMaxSessionId() int64

func (*AbstractTransactionStoreManager) ReadSession

func (transactionStoreManager *AbstractTransactionStoreManager) ReadSession(xid string) *session.GlobalSession

func (*AbstractTransactionStoreManager) ReadSessionWithBranchSessions

func (transactionStoreManager *AbstractTransactionStoreManager) ReadSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession

func (*AbstractTransactionStoreManager) ReadSessionWithSessionCondition

func (transactionStoreManager *AbstractTransactionStoreManager) ReadSessionWithSessionCondition(sessionCondition model.SessionCondition) []*session.GlobalSession

func (*AbstractTransactionStoreManager) Shutdown

func (transactionStoreManager *AbstractTransactionStoreManager) Shutdown()

func (*AbstractTransactionStoreManager) WriteSession

func (transactionStoreManager *AbstractTransactionStoreManager) WriteSession(logOperation LogOperation, session session.SessionStorable) bool

type DBTransactionStoreManager

type DBTransactionStoreManager struct {
	LogStore LogStore
	// contains filtered or unexported fields
}

func (*DBTransactionStoreManager) GetCurrentMaxSessionId

func (storeManager *DBTransactionStoreManager) GetCurrentMaxSessionId() int64

func (*DBTransactionStoreManager) ReadSession

func (storeManager *DBTransactionStoreManager) ReadSession(xid string) *session.GlobalSession

func (*DBTransactionStoreManager) ReadSessionByTransactionId

func (storeManager *DBTransactionStoreManager) ReadSessionByTransactionId(transactionId int64) *session.GlobalSession

func (*DBTransactionStoreManager) ReadSessionWithBranchSessions

func (storeManager *DBTransactionStoreManager) ReadSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession

func (*DBTransactionStoreManager) ReadSessionWithSessionCondition

func (storeManager *DBTransactionStoreManager) ReadSessionWithSessionCondition(sessionCondition model.SessionCondition) []*session.GlobalSession

func (*DBTransactionStoreManager) Shutdown

func (storeManager *DBTransactionStoreManager) Shutdown()

func (*DBTransactionStoreManager) WriteSession

func (storeManager *DBTransactionStoreManager) WriteSession(logOperation LogOperation, session session.SessionStorable) bool

type DataBaseSessionManager

type DataBaseSessionManager struct {
	TaskName string

	TransactionStoreManager TransactionStoreManager
	// contains filtered or unexported fields
}

func (*DataBaseSessionManager) AddBranchSession

func (sessionManager *DataBaseSessionManager) AddBranchSession(globalSession *session.GlobalSession, session *session.BranchSession) error

func (*DataBaseSessionManager) AddGlobalSession

func (sessionManager *DataBaseSessionManager) AddGlobalSession(session *session.GlobalSession) error

func (*DataBaseSessionManager) AllSessions

func (sessionManager *DataBaseSessionManager) AllSessions() []*session.GlobalSession

func (*DataBaseSessionManager) FindGlobalSession

func (sessionManager *DataBaseSessionManager) FindGlobalSession(xid string) *session.GlobalSession

func (*DataBaseSessionManager) FindGlobalSessionWithBranchSessions

func (sessionManager *DataBaseSessionManager) FindGlobalSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession

func (*DataBaseSessionManager) FindGlobalSessions

func (sessionManager *DataBaseSessionManager) FindGlobalSessions(condition model.SessionCondition) []*session.GlobalSession

func (*DataBaseSessionManager) Reload

func (sessionManager *DataBaseSessionManager) Reload()

func (*DataBaseSessionManager) RemoveBranchSession

func (sessionManager *DataBaseSessionManager) RemoveBranchSession(globalSession *session.GlobalSession, session *session.BranchSession) error

func (*DataBaseSessionManager) RemoveGlobalSession

func (sessionManager *DataBaseSessionManager) RemoveGlobalSession(session *session.GlobalSession) error

func (*DataBaseSessionManager) UpdateBranchSessionStatus

func (sessionManager *DataBaseSessionManager) UpdateBranchSessionStatus(session *session.BranchSession, status meta.BranchStatus) error

func (*DataBaseSessionManager) UpdateGlobalSessionStatus

func (sessionManager *DataBaseSessionManager) UpdateGlobalSessionStatus(session *session.GlobalSession, status meta.GlobalStatus) error

type DefaultSessionManager

type DefaultSessionManager struct {
	AbstractSessionManager
	SessionMap map[string]*session.GlobalSession
}

func (*DefaultSessionManager) AddGlobalSession

func (sessionManager *DefaultSessionManager) AddGlobalSession(session *session.GlobalSession) error

func (*DefaultSessionManager) AllSessions

func (sessionManager *DefaultSessionManager) AllSessions() []*session.GlobalSession

func (*DefaultSessionManager) FindGlobalSession

func (sessionManager *DefaultSessionManager) FindGlobalSession(xid string) *session.GlobalSession

func (*DefaultSessionManager) FindGlobalSessionWithBranchSessions

func (sessionManager *DefaultSessionManager) FindGlobalSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession

func (*DefaultSessionManager) FindGlobalSessions

func (sessionManager *DefaultSessionManager) FindGlobalSessions(condition model.SessionCondition) []*session.GlobalSession

func (*DefaultSessionManager) RemoveGlobalSession

func (sessionManager *DefaultSessionManager) RemoveGlobalSession(session *session.GlobalSession) error

func (*DefaultSessionManager) SetTransactionStoreManager

func (sessionManager *DefaultSessionManager) SetTransactionStoreManager(transactionStoreManager TransactionStoreManager)

type FileBasedSessionManager

type FileBasedSessionManager struct {
	DefaultSessionManager
	// contains filtered or unexported fields
}

func (*FileBasedSessionManager) Reload

func (sessionManager *FileBasedSessionManager) Reload()

type FileTransactionStoreManager

type FileTransactionStoreManager struct {
	SessionManager SessionManager

	LastModifiedTime  int64
	TrxStartTimeMills int64
	sync.Mutex
	// contains filtered or unexported fields
}

func (*FileTransactionStoreManager) GetCurrentMaxSessionId

func (storeManager *FileTransactionStoreManager) GetCurrentMaxSessionId() int64

func (*FileTransactionStoreManager) HasRemaining

func (storeManager *FileTransactionStoreManager) HasRemaining(isHistory bool) bool

func (*FileTransactionStoreManager) InitFile

func (storeManager *FileTransactionStoreManager) InitFile(fullFileName string)

func (*FileTransactionStoreManager) ReadSession

func (storeManager *FileTransactionStoreManager) ReadSession(xid string) *session.GlobalSession

func (*FileTransactionStoreManager) ReadSessionWithBranchSessions

func (storeManager *FileTransactionStoreManager) ReadSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession

func (*FileTransactionStoreManager) ReadSessionWithSessionCondition

func (storeManager *FileTransactionStoreManager) ReadSessionWithSessionCondition(sessionCondition model.SessionCondition) []*session.GlobalSession

func (*FileTransactionStoreManager) ReadWriteStore

func (storeManager *FileTransactionStoreManager) ReadWriteStore(readSize int, isHistory bool) []*TransactionWriteStore

func (*FileTransactionStoreManager) Shutdown

func (storeManager *FileTransactionStoreManager) Shutdown()

func (*FileTransactionStoreManager) WriteSession

func (storeManager *FileTransactionStoreManager) WriteSession(logOperation LogOperation, session session.SessionStorable) bool

type LogOperation

type LogOperation byte
const (
	LogOperationGlobalAdd LogOperation = iota
	/**
	 * Global update log operation.
	 */
	LogOperationGlobalUpdate
	/**
	 * Global remove log operation.
	 */
	LogOperationGlobalRemove
	/**
	 * Branch add log operation.
	 */
	LogOperationBranchAdd
	/**
	 * Branch update log operation.
	 */
	LogOperationBranchUpdate
	/**
	 * Branch remove log operation.
	 */
	LogOperationBranchRemove
)

func (LogOperation) String

func (t LogOperation) String() string

type LogStore

type LogStore interface {
	QueryGlobalTransactionDOByXid(xid string) *model.GlobalTransactionDO
	QueryGlobalTransactionDOByTransactionId(transactionId int64) *model.GlobalTransactionDO
	QueryGlobalTransactionDOByStatuses(statuses []int, limit int) []*model.GlobalTransactionDO
	InsertGlobalTransactionDO(globalTransaction model.GlobalTransactionDO) bool
	UpdateGlobalTransactionDO(globalTransaction model.GlobalTransactionDO) bool
	DeleteGlobalTransactionDO(globalTransaction model.GlobalTransactionDO) bool
	QueryBranchTransactionDOByXid(xid string) []*model.BranchTransactionDO
	QueryBranchTransactionDOByXids(xids []string) []*model.BranchTransactionDO
	InsertBranchTransactionDO(branchTransaction model.BranchTransactionDO) bool
	UpdateBranchTransactionDO(branchTransaction model.BranchTransactionDO) bool
	DeleteBranchTransactionDO(branchTransaction model.BranchTransactionDO) bool
	GetCurrentMaxSessionId(high int64, low int64) int64
}

type LogStoreDataBaseDAO

type LogStoreDataBaseDAO struct {
	// contains filtered or unexported fields
}

func (*LogStoreDataBaseDAO) DeleteBranchTransactionDO

func (dao *LogStoreDataBaseDAO) DeleteBranchTransactionDO(branchTransaction model.BranchTransactionDO) bool

func (*LogStoreDataBaseDAO) DeleteGlobalTransactionDO

func (dao *LogStoreDataBaseDAO) DeleteGlobalTransactionDO(globalTransaction model.GlobalTransactionDO) bool

func (*LogStoreDataBaseDAO) GetCurrentMaxSessionId

func (dao *LogStoreDataBaseDAO) GetCurrentMaxSessionId(high int64, low int64) int64

func (*LogStoreDataBaseDAO) InsertBranchTransactionDO

func (dao *LogStoreDataBaseDAO) InsertBranchTransactionDO(branchTransaction model.BranchTransactionDO) bool

func (*LogStoreDataBaseDAO) InsertGlobalTransactionDO

func (dao *LogStoreDataBaseDAO) InsertGlobalTransactionDO(globalTransaction model.GlobalTransactionDO) bool

func (*LogStoreDataBaseDAO) QueryBranchTransactionDOByXid

func (dao *LogStoreDataBaseDAO) QueryBranchTransactionDOByXid(xid string) []*model.BranchTransactionDO

func (*LogStoreDataBaseDAO) QueryBranchTransactionDOByXids

func (dao *LogStoreDataBaseDAO) QueryBranchTransactionDOByXids(xids []string) []*model.BranchTransactionDO

func (*LogStoreDataBaseDAO) QueryGlobalTransactionDOByStatuses

func (dao *LogStoreDataBaseDAO) QueryGlobalTransactionDOByStatuses(statuses []int, limit int) []*model.GlobalTransactionDO

func (*LogStoreDataBaseDAO) QueryGlobalTransactionDOByTransactionId

func (dao *LogStoreDataBaseDAO) QueryGlobalTransactionDOByTransactionId(transactionId int64) *model.GlobalTransactionDO

func (*LogStoreDataBaseDAO) QueryGlobalTransactionDOByXid

func (dao *LogStoreDataBaseDAO) QueryGlobalTransactionDOByXid(xid string) *model.GlobalTransactionDO

func (*LogStoreDataBaseDAO) UpdateBranchTransactionDO

func (dao *LogStoreDataBaseDAO) UpdateBranchTransactionDO(branchTransaction model.BranchTransactionDO) bool

func (*LogStoreDataBaseDAO) UpdateGlobalTransactionDO

func (dao *LogStoreDataBaseDAO) UpdateGlobalTransactionDO(globalTransaction model.GlobalTransactionDO) bool

type Reloadable

type Reloadable interface {
	// Reload states.
	Reload()
}

type ReloadableStore

type ReloadableStore interface {
	/**
	 * Read write holder.
	 *
	 * @param readSize  the read size
	 * @param isHistory the is history
	 * @return the list
	 */
	ReadWriteStore(readSize int, isHistory bool) []*TransactionWriteStore

	/**
	 * Has remaining boolean.
	 *
	 * @param isHistory the is history
	 * @return the boolean
	 */
	HasRemaining(isHistory bool) bool
}

type SessionHolder

type SessionHolder struct {
	RootSessionManager             SessionManager
	AsyncCommittingSessionManager  SessionManager
	RetryCommittingSessionManager  SessionManager
	RetryRollbackingSessionManager SessionManager
}

func GetSessionHolder

func GetSessionHolder() SessionHolder

func (SessionHolder) FindGlobalSession

func (sessionHolder SessionHolder) FindGlobalSession(xid string) *session.GlobalSession

func (SessionHolder) FindGlobalSessionWithBranchSessions

func (sessionHolder SessionHolder) FindGlobalSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession

type SessionManager

type SessionManager interface {
	// Add global session.
	AddGlobalSession(session *session.GlobalSession) error

	// Find global session global session.
	FindGlobalSession(xid string) *session.GlobalSession

	// Find global session global session.
	FindGlobalSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession

	// Update global session status.
	UpdateGlobalSessionStatus(session *session.GlobalSession, status meta.GlobalStatus) error

	// Remove global session.
	RemoveGlobalSession(session *session.GlobalSession) error

	// Add branch session.
	AddBranchSession(globalSession *session.GlobalSession, session *session.BranchSession) error

	// Update branch session status.
	UpdateBranchSessionStatus(session *session.BranchSession, status meta.BranchStatus) error

	// Remove branch session.
	RemoveBranchSession(globalSession *session.GlobalSession, session *session.BranchSession) error

	// All sessions collection.
	AllSessions() []*session.GlobalSession

	// Find global sessions list.
	FindGlobalSessions(condition model.SessionCondition) []*session.GlobalSession
}

func NewDataBaseSessionManager

func NewDataBaseSessionManager(taskName string, conf config.DBStoreConfig) SessionManager

func NewDefaultSessionManager

func NewDefaultSessionManager(name string) SessionManager

func NewFileBasedSessionManager

func NewFileBasedSessionManager(conf config.FileStoreConfig) SessionManager

type TransactionStoreManager

type TransactionStoreManager interface {
	// Write session boolean.
	WriteSession(logOperation LogOperation, session session.SessionStorable) bool

	// Read global session global session.
	ReadSession(xid string) *session.GlobalSession

	// Read session global session.
	ReadSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession

	// Read session by status list.
	ReadSessionWithSessionCondition(sessionCondition model.SessionCondition) []*session.GlobalSession

	// Shutdown.
	Shutdown()

	// Gets current max session id.
	GetCurrentMaxSessionId() int64
}

type TransactionWriteStore

type TransactionWriteStore struct {
	SessionRequest session.SessionStorable
	LogOperation   LogOperation
}

func (*TransactionWriteStore) Decode

func (transactionWriteStore *TransactionWriteStore) Decode(src []byte)

func (*TransactionWriteStore) Encode

func (transactionWriteStore *TransactionWriteStore) Encode() ([]byte, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL