Documentation ¶
Index ¶
- Variables
- func RunLockServicesForTest(level zapcore.Level, serviceIDs []string, lockTableBindTimeout time.Duration, ...)
- func SetLockServiceByServiceID(serviceID string, value LockService)
- func ShardingByRow(row []byte) uint64
- func WaitWaiters(ls LockService, group uint32, table uint64, key []byte, waitersCount int) error
- type AllocatorOption
- type Client
- type ClientOption
- type Config
- type Lock
- type LockOptions
- type LockService
- type LockStorage
- type LockTableAllocator
- type LockTableKeeper
- type Option
- type RequestHandleFunc
- type Server
- type ServerOption
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDeadlockDetectorClosed deadlock detector is closed ErrDeadlockDetectorClosed = moerr.NewInvalidStateNoCtx("deadlock detector is closed") // ErrTxnClosed txn not found ErrTxnNotFound = moerr.NewInvalidStateNoCtx("txn not found") // ErrMergeRangeLockNotSupport merge range lock not support with shared lock ErrMergeRangeLockNotSupport = moerr.NewNotSupportedNoCtx("merge range lock not support with shared lock") // ErrDeadLockDetected dead lock detected ErrDeadLockDetected = moerr.NewDeadLockDetectedNoCtx() // ErrDeadlockCheckBusy dead lock check is busy ErrDeadlockCheckBusy = moerr.NewDeadlockCheckBusyNoCtx() // ErrLockTableBindChanged lock table and lock service bind changed ErrLockTableBindChanged = moerr.NewLockTableBindChangedNoCtx() // ErrLockTableNotFound lock table not found on remote lock service ErrLockTableNotFound = moerr.NewLockTableNotFoundNoCtx() // ErrLockConflict lock option conflict ErrLockConflict = moerr.NewLockConflictNoCtx() )
Functions ¶
func RunLockServicesForTest ¶ added in v0.8.0
func RunLockServicesForTest( level zapcore.Level, serviceIDs []string, lockTableBindTimeout time.Duration, fn func(LockTableAllocator, []LockService), adjustConfig func(*Config), opts ...Option, )
RunLockServicesForTest is used to start a lock table allocator and some lock services for test
func SetLockServiceByServiceID ¶ added in v0.8.0
func SetLockServiceByServiceID(serviceID string, value LockService)
SetLockServiceByServiceID set lockservice instance into process level runtime.
func ShardingByRow ¶ added in v1.2.3
func WaitWaiters ¶ added in v0.8.0
WaitWaiters wait waiters
Types ¶
type AllocatorOption ¶ added in v1.2.0
type AllocatorOption func(*lockTableAllocator)
type Client ¶ added in v0.8.0
type Client interface { // Send send request to other lock service, and wait for a response synchronously. Send(context.Context, *pb.Request) (*pb.Response, error) // AsyncSend async send request to other lock service. AsyncSend(context.Context, *pb.Request) (*morpc.Future, error) // Close close the client Close() error }
Client is used to send lock table operations to other service. 1. lock service <-> lock service 2. lock service <-> lock table allocator
type ClientOption ¶ added in v1.2.0
type ClientOption func(c *client)
func WithMOCluster ¶ added in v1.2.0
func WithMOCluster(cluster clusterservice.MOCluster) ClientOption
type Config ¶ added in v0.8.0
type Config struct { // ServiceID service id ServiceID string `toml:"-"` // RPC rpc config RPC morpc.Config `toml:"-"` // TxnIterFunc used to iterate all active transactions in current cn TxnIterFunc func(func([]byte) bool) `toml:"-"` // ListenAddress lock service listen address for receiving lock requests ListenAddress string `toml:"listen-address"` // ServiceAddress service address for communication, if this address is not set, use // ListenAddress as the communication address. ServiceAddress string `toml:"service-address"` // MaxFixedSliceSize lockservice uses fixedSlice to store all lock information, a pool // of fixedSlice will be built internally, there are various specifications of fixexSlice, // this value sets how big the maximum specification of FixedSlice is. MaxFixedSliceSize toml.ByteSize `toml:"max-fixed-slice-size"` // KeepBindDuration Maintain the period of the locktable bound to the current service KeepBindDuration toml.Duration `toml:"keep-bind-duration"` // KeepRemoteLockDuration how often to send a heartbeat to maintain a lock on a remote // locktable. KeepRemoteLockDuration toml.Duration `toml:"keep-remote-lock-duration"` // RemoteLockTimeout how long does it take to receive a heartbeat that maintains the // remote lock before releasing the lock RemoteLockTimeout toml.Duration `toml:"remote-lock-timeout"` // MaxLockRowCount each time a lock is added, some LockRow is stored in the lockservice, if // too many LockRows are put in each time, it will cause too much memory overhead, this value // limits the maximum count of LocRow put into the LockService each time, beyond this value it // will be converted into a Range of locks MaxLockRowCount toml.ByteSize `toml:"max-row-lock-count"` // KeepBindTimeout when a locktable is assigned to a lockservice, the lockservice will // continuously hold the bind, and if no hold request is received after the configured time, // then all bindings for the service will fail. KeepBindTimeout toml.Duration `toml:"keep-bind-timeout"` // EnableRemoteLocalProxy enable remote local proxy. The proxy used to reduce remote shared // lock and unlock request. EnableRemoteLocalProxy bool `toml:"enable-remote-local-proxy"` // contains filtered or unexported fields }
Config lock service config
type Lock ¶
type Lock struct {
// contains filtered or unexported fields
}
Lock stores specific lock information. Since there are a large number of lock objects in the LockStorage at runtime, this object has been specially designed to save memory usage.
func (Lock) GetLockMode ¶ added in v1.0.0
GetLockMode returns lock mode
func (Lock) IsRangeLock ¶ added in v1.0.0
IsRangeLock return true if is range lock
func (Lock) IterHolders ¶ added in v1.0.0
IterHolders iter lock holders, if holders is empty means the last holder is closed and the lock is waiting first waiter to get this lock.
func (Lock) IterWaiters ¶ added in v1.0.0
IterHolders iter which txn is waiting for this lock
type LockOptions ¶
type LockOptions struct { pb.LockOptions // contains filtered or unexported fields }
LockOptions options for lock
type LockService ¶
type LockService interface { // GetServiceID return service id GetServiceID() string // GetConfig returns the lockservice config GetConfig() Config // Lock locks rows(row or row range determined by the Granularity in options) a table. Lockservice // has no requirement for the format of rows, but requires all rows of a table on a lockservice // to be sortable. // // If a conflict is encountered, the method will block until the conflicting lock is // released and held by the current operation, or until it times out. // // Returns false if conflicts are encountered in FastFail wait policy and ErrDeadLockDetected // returns if current operation was aborted by deadlock detection. Lock(ctx context.Context, tableID uint64, rows [][]byte, txnID []byte, options pb.LockOptions) (pb.Result, error) // Unlock release all locks associated with the transaction. If commitTS is not empty, means // the txn was committed. Unlock(ctx context.Context, txnID []byte, commitTS timestamp.Timestamp, mutations ...pb.ExtraMutation) error // IsOrphanTxn check txn is orphan txn IsOrphanTxn(context.Context, []byte) (bool, error) // Close close the lock service. Close() error // GetWaitingList get special txnID's waiting list GetWaitingList(ctx context.Context, txnID []byte) (bool, []pb.WaitTxn, error) // ForceRefreshLockTableBinds force refresh all lock tables binds ForceRefreshLockTableBinds(targets []uint64, matcher func(bind pb.LockTable) bool) // GetLockTableBind returns lock table bind GetLockTableBind(group uint32, tableID uint64) (pb.LockTable, error) // IterLocks iter all locks on current lock service. len(keys) == 2 if is range lock, // len(keys) == 1 if is row lock. And keys only valid in current iter func call. IterLocks(func(tableID uint64, keys [][]byte, lock Lock) bool) // CloseRemoteLockTable close lock table CloseRemoteLockTable(group uint32, tableID, version uint64) (bool, error) }
LockService lock service is running at the CN node. The lockservice maintains a set of LockStorage internally (one table corresponds to one LockStorage instance). All Lock and Unlock operations on each Table are concurrent.
Lock waiting is implemented as fair, internally there is a waiting queue for each Lock and when a Lock is released, a new Lock is executed in a FIFO fashion. And the element in the wait queue is the transaction ID.
The current lock waiting mechanism will trigger deadlock, so lockservice has implemented a deadlock detection mechanism internally. In order to ensure the performance of Lock operations, we cannot synchronise deadlock detection with each Lock operation. The current implementation is that when a new waiter is added to the wait queue of any Lock, a set of background goroutines are notified to start a deadlock detection for all transactions in the Lock's wait queue.
func GetLockServiceByServiceID ¶ added in v0.8.0
func GetLockServiceByServiceID(serviceID string) LockService
GetLockServiceByServiceID get lockservice instance by service id from process level runtime.
func NewLockService ¶
func NewLockService( cfg Config, opts ...Option) LockService
NewLockService create a lock service instance
type LockStorage ¶
type LockStorage interface { // Add we use kv to store the lock. Key is a locked row or a row range. Value is the // TxnID. Add(key []byte, value Lock) // Get returns the value of the given key Get(key []byte) (Lock, bool) // Len returns number of the locks in the storage Len() int // Delete delete lock from the storage Delete(key []byte) (Lock, bool) // Seek returns the first KV Pair that is >= the given key Seek(key []byte) ([]byte, Lock, bool) // Prev returns the first KV Pair that is < the given key Prev(key []byte) ([]byte, Lock, bool) // Range range in [start, end), if end == nil, no upperBounded Range(start []byte, end []byte, fn func([]byte, Lock) bool) // Iter iter all values Iter(func([]byte, Lock) bool) // Clear clear the lock Clear() }
LockStorage the store that holds the locks, a storage instance is corresponding to all the locks of a table. The LockStorage no need to be thread-safe.
All locks are stored in an orderly in the LockStorage, so lock conflicts can be easily detected.
type LockTableAllocator ¶ added in v0.8.0
type LockTableAllocator interface { // Get get the original LockTable data corresponding to a Table. If there is no // corresponding binding, then the CN binding of the current request will be used. Get(serviceID string, group uint32, tableID, originTableID uint64, sharding pb.Sharding) pb.LockTable // KeepLockTableBind once a cn is bound to a Table, a heartbeat needs to be sent // periodically to keep the binding in place. If no heartbeat is sent for a long // period of time to maintain the binding, the binding will become invalid. KeepLockTableBind(serviceID string) bool // Valid check for changes in the binding relationship of a specific lock-table. Valid(serviceID string, txnID []byte, binds []pb.LockTable) ([]uint64, error) // AddCannotCommit add cannot commit txn. AddCannotCommit(values []pb.OrphanTxn) [][]byte // Close close the lock table allocator Close() error // GetLatest get latest lock table bind GetLatest(groupID uint32, tableID uint64) pb.LockTable // GetVersion get latest version GetVersion() uint64 }
LockTableAllocator is used to managing the binding relationship between LockTable and LockService, and check the validity of the binding held by the transaction when the transaction is committed.
A LockTable will only be bound by a LockService, and once the binding relationship between a LockTable and a LockService changes, the binding version will also change. Once a LockService goes offline (crash or network partition), the LockTable is bound to another LockService.
During the [Txn-Lock, Txn-Commit] time period, if the binding between LockTable and LockService changes, we need to be able to detect it and get the transaction rolled back, because the Lock acquired by this transaction is not valid and cannot resolve W-W conflicts.
func NewLockTableAllocator ¶ added in v0.8.0
func NewLockTableAllocator( address string, keepBindTimeout time.Duration, cfg morpc.Config, opts ...AllocatorOption, ) LockTableAllocator
NewLockTableAllocator create a memory based lock table allocator.
type LockTableKeeper ¶ added in v0.8.0
type LockTableKeeper interface { // Close close the keeper Close() error }
LockTableKeeper is used to keep a heartbeat with the LockTableAllocator to keep the LockTable bind. And get the changed info of LockTable and LockService bind.
func NewLockTableKeeper ¶ added in v0.8.0
func NewLockTableKeeper( serviceID string, client Client, keepLockTableBindInterval time.Duration, keepRemoteLockInterval time.Duration, groupTables *lockTableHolders, service *service) LockTableKeeper
NewLockTableKeeper create a locktable keeper, an internal timer is started to send a keepalive request to the lockTableAllocator every interval, so this interval needs to be much smaller than the real lockTableAllocator's timeout.
type RequestHandleFunc ¶ added in v0.8.0
type RequestHandleFunc func(context.Context, context.CancelFunc, *pb.Request, *pb.Response, morpc.ClientSession)
RequestHandleFunc request handle func
type Server ¶ added in v0.8.0
type Server interface { // Start start the txn server Start() error // Close the txn server Close() error // RegisterMethodHandler register txn request handler func RegisterMethodHandler(pb.Method, RequestHandleFunc) }
Server receives and processes requests from Client.
type ServerOption ¶ added in v0.8.0
type ServerOption func(*server)
ServerOption server option
func WithServerMessageFilter ¶ added in v0.8.0
func WithServerMessageFilter(filter func(*pb.Request) bool) ServerOption
WithServerMessageFilter set filter func. Requests can be modified or filtered out by the filter before they are processed by the handler.
Source Files ¶
- btree_storage.go
- cfg.go
- deadlock.go
- lock.go
- lock_table_allocator.go
- lock_table_keeper.go
- lock_table_local.go
- lock_table_proxy.go
- lock_table_remote.go
- log.go
- reuse.go
- rpc.go
- service.go
- service_forward.go
- service_observability.go
- service_remote.go
- slice.go
- test_helper.go
- txn.go
- types.go
- waiter.go
- waiter_events.go
- waiter_queue.go