lockservice

package
v1.2.3-hotfix-20241010 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDeadlockDetectorClosed deadlock detector is closed
	ErrDeadlockDetectorClosed = moerr.NewInvalidStateNoCtx("deadlock detector is closed")
	// ErrTxnClosed txn not found
	ErrTxnNotFound = moerr.NewInvalidStateNoCtx("txn not found")
	// ErrMergeRangeLockNotSupport merge range lock not support with shared lock
	ErrMergeRangeLockNotSupport = moerr.NewNotSupportedNoCtx("merge range lock not support with shared lock")
	// ErrDeadLockDetected dead lock detected
	ErrDeadLockDetected = moerr.NewDeadLockDetectedNoCtx()
	// ErrDeadlockCheckBusy dead lock check is busy
	ErrDeadlockCheckBusy = moerr.NewDeadlockCheckBusyNoCtx()
	// ErrLockTableBindChanged lock table and lock service bind changed
	ErrLockTableBindChanged = moerr.NewLockTableBindChangedNoCtx()
	// ErrLockTableNotFound lock table not found on remote lock service
	ErrLockTableNotFound = moerr.NewLockTableNotFoundNoCtx()
	// ErrLockConflict lock option conflict
	ErrLockConflict = moerr.NewLockConflictNoCtx()
)

Functions

func RunLockServicesForTest added in v0.8.0

func RunLockServicesForTest(
	level zapcore.Level,
	serviceIDs []string,
	lockTableBindTimeout time.Duration,
	fn func(LockTableAllocator, []LockService),
	adjustConfig func(*Config),
	opts ...Option,
)

RunLockServicesForTest is used to start a lock table allocator and some lock services for test

func SetLockServiceByServiceID added in v0.8.0

func SetLockServiceByServiceID(serviceID string, value LockService)

SetLockServiceByServiceID set lockservice instance into process level runtime.

func ShardingByRow added in v1.2.3

func ShardingByRow(row []byte) uint64

func WaitWaiters added in v0.8.0

func WaitWaiters(
	ls LockService,
	group uint32,
	table uint64,
	key []byte,
	waitersCount int) error

WaitWaiters wait waiters

Types

type AllocatorOption added in v1.2.0

type AllocatorOption func(*lockTableAllocator)

type Client added in v0.8.0

type Client interface {
	// Send send request to other lock service, and wait for a response synchronously.
	Send(context.Context, *pb.Request) (*pb.Response, error)
	// AsyncSend async send request to other lock service.
	AsyncSend(context.Context, *pb.Request) (*morpc.Future, error)
	// Close close the client
	Close() error
}

Client is used to send lock table operations to other service. 1. lock service <-> lock service 2. lock service <-> lock table allocator

func NewClient added in v0.8.0

func NewClient(cfg morpc.Config, opts ...ClientOption) (Client, error)

type ClientOption added in v1.2.0

type ClientOption func(c *client)

func WithMOCluster added in v1.2.0

func WithMOCluster(cluster clusterservice.MOCluster) ClientOption

type Config added in v0.8.0

type Config struct {
	// ServiceID service id
	ServiceID string `toml:"-"`
	// RPC rpc config
	RPC morpc.Config `toml:"-"`
	// TxnIterFunc used to iterate all active transactions in current cn
	TxnIterFunc func(func([]byte) bool) `toml:"-"`

	// ListenAddress lock service listen address for receiving lock requests
	ListenAddress string `toml:"listen-address"`
	// ServiceAddress service address for communication, if this address is not set, use
	// ListenAddress as the communication address.
	ServiceAddress string `toml:"service-address"`
	// MaxFixedSliceSize lockservice uses fixedSlice to store all lock information, a pool
	// of fixedSlice will be built internally, there are various specifications of fixexSlice,
	// this value sets how big the maximum specification of FixedSlice is.
	MaxFixedSliceSize toml.ByteSize `toml:"max-fixed-slice-size"`
	// KeepBindDuration Maintain the period of the locktable bound to the current service
	KeepBindDuration toml.Duration `toml:"keep-bind-duration"`
	// KeepRemoteLockDuration how often to send a heartbeat to maintain a lock on a remote
	// locktable.
	KeepRemoteLockDuration toml.Duration `toml:"keep-remote-lock-duration"`
	// RemoteLockTimeout how long does it take to receive a heartbeat that maintains the
	// remote lock before releasing the lock
	RemoteLockTimeout toml.Duration `toml:"remote-lock-timeout"`
	// MaxLockRowCount each time a lock is added, some LockRow is stored in the lockservice, if
	// too many LockRows are put in each time, it will cause too much memory overhead, this value
	// limits the maximum count of LocRow put into the LockService each time, beyond this value it
	// will be converted into a Range of locks
	MaxLockRowCount toml.ByteSize `toml:"max-row-lock-count"`
	// KeepBindTimeout when a locktable is assigned to a lockservice, the lockservice will
	// continuously hold the bind, and if no hold request is received after the configured time,
	// then all bindings for the service will fail.
	KeepBindTimeout toml.Duration `toml:"keep-bind-timeout"`
	// EnableRemoteLocalProxy enable remote local proxy. The proxy used to reduce remote shared
	// lock and unlock request.
	EnableRemoteLocalProxy bool `toml:"enable-remote-local-proxy"`
	// contains filtered or unexported fields
}

Config lock service config

func (*Config) Validate added in v0.8.0

func (c *Config) Validate()

Validate validate

type Lock

type Lock struct {
	// contains filtered or unexported fields
}

Lock stores specific lock information. Since there are a large number of lock objects in the LockStorage at runtime, this object has been specially designed to save memory usage.

func (Lock) GetLockMode added in v1.0.0

func (l Lock) GetLockMode() pb.LockMode

GetLockMode returns lock mode

func (Lock) IsRangeLock added in v1.0.0

func (l Lock) IsRangeLock() bool

IsRangeLock return true if is range lock

func (Lock) IterHolders added in v1.0.0

func (l Lock) IterHolders(fn func(holder pb.WaitTxn) bool)

IterHolders iter lock holders, if holders is empty means the last holder is closed and the lock is waiting first waiter to get this lock.

func (Lock) IterWaiters added in v1.0.0

func (l Lock) IterWaiters(fn func(waiter pb.WaitTxn) bool)

IterHolders iter which txn is waiting for this lock

func (Lock) String added in v0.8.0

func (l Lock) String() string

String implement Stringer

type LockOptions

type LockOptions struct {
	pb.LockOptions
	// contains filtered or unexported fields
}

LockOptions options for lock

type LockService

type LockService interface {
	// GetServiceID return service id
	GetServiceID() string
	// GetConfig returns the lockservice config
	GetConfig() Config
	// Lock locks rows(row or row range determined by the Granularity in options) a table. Lockservice
	// has no requirement for the format of rows, but requires all rows of a table on a lockservice
	// to be sortable.
	//
	// If a conflict is encountered, the method will block until the conflicting lock is
	// released and held by the current operation, or until it times out.
	//
	// Returns false if conflicts are encountered in FastFail wait policy and ErrDeadLockDetected
	// returns if current operation was aborted by deadlock detection.
	Lock(ctx context.Context, tableID uint64, rows [][]byte, txnID []byte, options pb.LockOptions) (pb.Result, error)
	// Unlock release all locks associated with the transaction. If commitTS is not empty, means
	// the txn was committed.
	Unlock(ctx context.Context, txnID []byte, commitTS timestamp.Timestamp, mutations ...pb.ExtraMutation) error
	// IsOrphanTxn check txn is orphan txn
	IsOrphanTxn(context.Context, []byte) (bool, error)

	// Close close the lock service.
	Close() error

	// GetWaitingList get special txnID's waiting list
	GetWaitingList(ctx context.Context, txnID []byte) (bool, []pb.WaitTxn, error)
	// ForceRefreshLockTableBinds force refresh all lock tables binds
	ForceRefreshLockTableBinds(targets []uint64, matcher func(bind pb.LockTable) bool)
	// GetLockTableBind returns lock table bind
	GetLockTableBind(group uint32, tableID uint64) (pb.LockTable, error)
	// IterLocks iter all locks on current lock service. len(keys) == 2 if is range lock,
	// len(keys) == 1 if is row lock. And keys only valid in current iter func call.
	IterLocks(func(tableID uint64, keys [][]byte, lock Lock) bool)
	// CloseRemoteLockTable close lock table
	CloseRemoteLockTable(group uint32, tableID, version uint64) (bool, error)
}

LockService lock service is running at the CN node. The lockservice maintains a set of LockStorage internally (one table corresponds to one LockStorage instance). All Lock and Unlock operations on each Table are concurrent.

Lock waiting is implemented as fair, internally there is a waiting queue for each Lock and when a Lock is released, a new Lock is executed in a FIFO fashion. And the element in the wait queue is the transaction ID.

The current lock waiting mechanism will trigger deadlock, so lockservice has implemented a deadlock detection mechanism internally. In order to ensure the performance of Lock operations, we cannot synchronise deadlock detection with each Lock operation. The current implementation is that when a new waiter is added to the wait queue of any Lock, a set of background goroutines are notified to start a deadlock detection for all transactions in the Lock's wait queue.

func GetLockServiceByServiceID added in v0.8.0

func GetLockServiceByServiceID(serviceID string) LockService

GetLockServiceByServiceID get lockservice instance by service id from process level runtime.

func NewLockService

func NewLockService(
	cfg Config,
	opts ...Option) LockService

NewLockService create a lock service instance

type LockStorage

type LockStorage interface {
	// Add we use kv to store the lock. Key is a locked row or a row range. Value is the
	// TxnID.
	Add(key []byte, value Lock)
	// Get returns the value of the given key
	Get(key []byte) (Lock, bool)
	// Len returns number of the locks in the storage
	Len() int
	// Delete delete lock from the storage
	Delete(key []byte) (Lock, bool)
	// Seek returns the first KV Pair that is >= the given key
	Seek(key []byte) ([]byte, Lock, bool)
	// Prev returns the first KV Pair that is < the given key
	Prev(key []byte) ([]byte, Lock, bool)
	// Range range in [start, end), if end == nil, no upperBounded
	Range(start []byte, end []byte, fn func([]byte, Lock) bool)
	// Iter iter all values
	Iter(func([]byte, Lock) bool)
	// Clear clear the lock
	Clear()
}

LockStorage the store that holds the locks, a storage instance is corresponding to all the locks of a table. The LockStorage no need to be thread-safe.

All locks are stored in an orderly in the LockStorage, so lock conflicts can be easily detected.

type LockTableAllocator added in v0.8.0

type LockTableAllocator interface {
	// Get get the original LockTable data corresponding to a Table. If there is no
	// corresponding binding, then the CN binding of the current request will be used.
	Get(serviceID string, group uint32, tableID, originTableID uint64, sharding pb.Sharding) pb.LockTable
	// KeepLockTableBind once a cn is bound to a Table, a heartbeat needs to be sent
	// periodically to keep the binding in place. If no heartbeat is sent for a long
	// period of time to maintain the binding, the binding will become invalid.
	KeepLockTableBind(serviceID string) bool
	// Valid check for changes in the binding relationship of a specific lock-table.
	Valid(serviceID string, txnID []byte, binds []pb.LockTable) ([]uint64, error)
	// AddCannotCommit add cannot commit txn.
	AddCannotCommit(values []pb.OrphanTxn) [][]byte
	// Close close the lock table allocator
	Close() error

	// GetLatest get latest lock table bind
	GetLatest(groupID uint32, tableID uint64) pb.LockTable

	// GetVersion get latest version
	GetVersion() uint64
}

LockTableAllocator is used to managing the binding relationship between LockTable and LockService, and check the validity of the binding held by the transaction when the transaction is committed.

A LockTable will only be bound by a LockService, and once the binding relationship between a LockTable and a LockService changes, the binding version will also change. Once a LockService goes offline (crash or network partition), the LockTable is bound to another LockService.

During the [Txn-Lock, Txn-Commit] time period, if the binding between LockTable and LockService changes, we need to be able to detect it and get the transaction rolled back, because the Lock acquired by this transaction is not valid and cannot resolve W-W conflicts.

func NewLockTableAllocator added in v0.8.0

func NewLockTableAllocator(
	address string,
	keepBindTimeout time.Duration,
	cfg morpc.Config,
	opts ...AllocatorOption,
) LockTableAllocator

NewLockTableAllocator create a memory based lock table allocator.

type LockTableKeeper added in v0.8.0

type LockTableKeeper interface {
	// Close close the keeper
	Close() error
}

LockTableKeeper is used to keep a heartbeat with the LockTableAllocator to keep the LockTable bind. And get the changed info of LockTable and LockService bind.

func NewLockTableKeeper added in v0.8.0

func NewLockTableKeeper(
	serviceID string,
	client Client,
	keepLockTableBindInterval time.Duration,
	keepRemoteLockInterval time.Duration,
	groupTables *lockTableHolders,
	service *service) LockTableKeeper

NewLockTableKeeper create a locktable keeper, an internal timer is started to send a keepalive request to the lockTableAllocator every interval, so this interval needs to be much smaller than the real lockTableAllocator's timeout.

type Option added in v1.2.0

type Option func(s *service)

Option lockservice option

func WithWait added in v1.2.0

func WithWait(wait func()) Option

WithWait setup wait func to wait some condition ready

type RequestHandleFunc added in v0.8.0

type RequestHandleFunc func(context.Context, context.CancelFunc, *pb.Request, *pb.Response, morpc.ClientSession)

RequestHandleFunc request handle func

type Server added in v0.8.0

type Server interface {
	// Start start the txn server
	Start() error
	// Close the txn server
	Close() error
	// RegisterMethodHandler register txn request handler func
	RegisterMethodHandler(pb.Method, RequestHandleFunc)
}

Server receives and processes requests from Client.

func NewServer added in v0.8.0

func NewServer(
	address string,
	cfg morpc.Config,
	opts ...ServerOption) (Server, error)

NewServer create a lockservice server. One LockService corresponds to one Server

type ServerOption added in v0.8.0

type ServerOption func(*server)

ServerOption server option

func WithServerMessageFilter added in v0.8.0

func WithServerMessageFilter(filter func(*pb.Request) bool) ServerOption

WithServerMessageFilter set filter func. Requests can be modified or filtered out by the filter before they are processed by the handler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL