server

package
v0.11.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2024 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTooManyCursors    = errors.New("too many cursors")
	ErrInvalidHeadOffset = errors.New("invalid head offset")
)
View Source
var InvalidEntryId = &proto.EntryId{
	Term:   wal.InvalidTerm,
	Offset: wal.InvalidOffset,
}
View Source
var WrapperUpdateOperationCallback kv.UpdateOperationCallback = &wrapperUpdateCallback{}

Functions

func ReadHeaderInt64

func ReadHeaderInt64(md metadata.MD, key string) (v int64, err error)

func SessionKey

func SessionKey(sessionId SessionId) string

func ShadowKey added in v0.3.0

func ShadowKey(sessionId SessionId, key string) string

Types

type Client

type Client interface {
	Send(*proto.ShardAssignments) error

	Context() context.Context
}

type Config

type Config struct {
	PublicServiceAddr   string
	InternalServiceAddr string
	PeerTLS             *tls.Config
	ServerTLS           *tls.Config
	InternalServerTLS   *tls.Config
	MetricsServiceAddr  string

	AuthOptions auth.Options

	DataDir string
	WalDir  string

	WalRetentionTime           time.Duration
	WalSyncData                bool
	NotificationsRetentionTime time.Duration

	DbBlockCacheMB int64
}

type CursorAcker

type CursorAcker interface {
	Ack(offset int64)
}

type FollowerController

type FollowerController interface {
	io.Closer

	// NewTerm
	//
	// Node handles a new term request
	//
	// A node receives a new term request, fences itself and responds
	// with its head offset.
	//
	// When a node is fenced it cannot:
	// - accept any writes from a client.
	// - accept append from a leader.
	// - send any entries to followers if it was a leader.
	//
	// Any existing follow cursors are destroyed as is any state
	// regarding reconfigurations.
	NewTerm(req *proto.NewTermRequest) (*proto.NewTermResponse, error)

	// Truncate
	//
	// A node that receives a truncate request knows that it
	// has been selected as a follower. It truncates its log
	// to the indicates entry id, updates its term and changes
	// to a Follower.
	Truncate(req *proto.TruncateRequest) (*proto.TruncateResponse, error)

	Replicate(stream proto.OxiaLogReplication_ReplicateServer) error

	SendSnapshot(stream proto.OxiaLogReplication_SendSnapshotServer) error

	GetStatus(request *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
	DeleteShard(request *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)

	Term() int64
	CommitOffset() int64
	Status() proto.ServingStatus
}

FollowerController handles all the operations of a given shard's follower.

func NewFollowerController

func NewFollowerController(config Config, namespace string, shardId int64, wf wal.Factory, kvFactory kv.Factory) (FollowerController, error)

type FollowerCursor

type FollowerCursor interface {
	io.Closer

	ShardId() int64

	// LastPushed
	// The last entry that was sent to this follower
	LastPushed() int64

	// AckOffset The highest entry already acknowledged by this follower
	AckOffset() int64
}

FollowerCursor The FollowerCursor represents a cursor on the leader WAL that sends entries to a specific follower and receives a stream of acknowledgments from that follower.

func NewFollowerCursor

func NewFollowerCursor(
	follower string,
	term int64,
	namespace string,
	shardId int64,
	replicateStreamProvider ReplicateStreamProvider,
	ackTracker QuorumAckTracker,
	walObject wal.Wal,
	db kv.DB,
	ackOffset int64) (FollowerCursor, error)

type GetResult

type GetResult struct {
	Response *proto.GetResponse
	Err      error
}

type LeaderController

type LeaderController interface {
	io.Closer

	Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)
	WriteStream(stream proto.OxiaClient_WriteStreamServer) error
	Read(ctx context.Context, request *proto.ReadRequest) <-chan GetResult
	List(ctx context.Context, request *proto.ListRequest) (<-chan string, error)
	ListSliceNoMutex(ctx context.Context, request *proto.ListRequest) ([]string, error)
	RangeScan(ctx context.Context, request *proto.RangeScanRequest) (<-chan *proto.GetResponse, <-chan error, error)

	// NewTerm Handle new term requests
	NewTerm(req *proto.NewTermRequest) (*proto.NewTermResponse, error)

	// BecomeLeader Handles BecomeLeaderRequest from coordinator and prepares to be leader for the shard
	BecomeLeader(ctx context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)

	AddFollower(request *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)

	GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error

	GetStatus(request *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
	DeleteShard(request *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)

	// Term The current term of the leader
	Term() int64

	// Status The Status of the leader
	Status() proto.ServingStatus

	CreateSession(*proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
	KeepAlive(sessionId int64) error
	CloseSession(*proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
}

func NewLeaderController

func NewLeaderController(config Config, namespace string, shardId int64, rpcClient ReplicationRpcProvider, walFactory wal.Factory, kvFactory kv.Factory) (LeaderController, error)

type MessageWithTerm

type MessageWithTerm interface {
	GetTerm() int64
}

type QuorumAckTracker

type QuorumAckTracker interface {
	io.Closer

	CommitOffset() int64

	// WaitForCommitOffset
	// Waits for the specific entry id to be fully committed.
	// After that, invokes the function f
	WaitForCommitOffset(ctx context.Context, offset int64, f func() (*proto.WriteResponse, error)) (*proto.WriteResponse, error)

	WaitForCommitOffsetAsync(offset int64, f func() (*proto.WriteResponse, error), callback func(*proto.WriteResponse, error))

	// NextOffset returns the offset for the next entry to write
	// Note this can go ahead of the head-offset as there can be multiple operations in flight.
	NextOffset() int64

	HeadOffset() int64

	AdvanceHeadOffset(headOffset int64)

	// WaitForHeadOffset
	// Waits until the specified entry is written on the wal
	WaitForHeadOffset(ctx context.Context, offset int64) error

	// NewCursorAcker creates a tracker for a new cursor
	// The `ackOffset` is the previous last-acked position for the cursor
	NewCursorAcker(ackOffset int64) (CursorAcker, error)
}

QuorumAckTracker The QuorumAckTracker is responsible for keeping track of the head offset and commit offset of a shard

  • Head offset: the last entry written in the local WAL of the leader
  • Commit offset: the oldest entry that is considered "fully committed", as it has received the requested amount of acks from the followers

The quorum ack tracker is also used to block until the head offset or commit offset are advanced.

func NewQuorumAckTracker

func NewQuorumAckTracker(replicationFactor uint32, headOffset int64, commitOffset int64) QuorumAckTracker

type ReplicateStreamProvider

type ReplicateStreamProvider interface {
	GetReplicateStream(ctx context.Context, follower string, namespace string, shard int64, term int64) (proto.OxiaLogReplication_ReplicateClient, error)
	SendSnapshot(ctx context.Context, follower string, namespace string, shard int64, term int64) (proto.OxiaLogReplication_SendSnapshotClient, error)
}

ReplicateStreamProvider This is a provider for the ReplicateStream Grpc handler It's used to allow passing in a mocked version of the Grpc service.

type ReplicationRpcProvider

type ReplicationRpcProvider interface {
	io.Closer
	ReplicateStreamProvider

	Truncate(follower string, req *proto.TruncateRequest) (*proto.TruncateResponse, error)
}

func NewReplicationRpcProvider

func NewReplicationRpcProvider(tlsConf *tls.Config) ReplicationRpcProvider

type Server

type Server struct {
	// contains filtered or unexported fields
}

func New

func New(config Config) (*Server, error)

func NewWithGrpcProvider

func NewWithGrpcProvider(config Config, provider container.GrpcProvider, replicationRpcProvider ReplicationRpcProvider) (*Server, error)

func (Server) AddFollower

func (s Server) AddFollower(c context.Context, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)

func (Server) BecomeLeader

func (s Server) BecomeLeader(c context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)

func (*Server) Close

func (s *Server) Close() error

func (Server) CloseSession

func (s Server) CloseSession(ctx context.Context, req *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)

func (Server) CreateSession

func (s Server) CreateSession(ctx context.Context, req *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)

func (Server) DeleteShard

func (s Server) DeleteShard(_ context.Context, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)

func (Server) GetNotifications

func (s Server) GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error

func (Server) GetShardAssignments

func (s Server) GetShardAssignments(req *proto.ShardAssignmentsRequest, srv proto.OxiaClient_GetShardAssignmentsServer) error

func (Server) GetStatus

func (s Server) GetStatus(_ context.Context, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error)

func (*Server) InternalPort

func (s *Server) InternalPort() int

func (Server) KeepAlive

func (s Server) KeepAlive(ctx context.Context, req *proto.SessionHeartbeat) (*proto.KeepAliveResponse, error)

func (Server) List

func (s Server) List(request *proto.ListRequest, stream proto.OxiaClient_ListServer) error

func (Server) NewTerm

func (s Server) NewTerm(c context.Context, req *proto.NewTermRequest) (*proto.NewTermResponse, error)

func (Server) Port

func (s Server) Port() int

func (*Server) PublicPort

func (s *Server) PublicPort() int

func (Server) PushShardAssignments

func (s Server) PushShardAssignments(srv proto.OxiaCoordination_PushShardAssignmentsServer) error

func (Server) RangeScan added in v0.5.0

func (s Server) RangeScan(request *proto.RangeScanRequest, stream proto.OxiaClient_RangeScanServer) error

func (Server) Read

func (s Server) Read(request *proto.ReadRequest, stream proto.OxiaClient_ReadServer) error

func (Server) Replicate

func (s Server) Replicate(srv proto.OxiaLogReplication_ReplicateServer) error

func (Server) SendSnapshot

func (s Server) SendSnapshot(srv proto.OxiaLogReplication_SendSnapshotServer) error

func (Server) Truncate

func (s Server) Truncate(c context.Context, req *proto.TruncateRequest) (*proto.TruncateResponse, error)

func (Server) Write

func (s Server) Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)

func (Server) WriteStream added in v0.7.0

func (s Server) WriteStream(stream proto.OxiaClient_WriteStreamServer) error

type SessionId

type SessionId int64

func KeyToId

func KeyToId(key string) (SessionId, error)

type SessionManager

type SessionManager interface {
	io.Closer
	CreateSession(request *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
	KeepAlive(sessionId int64) error
	CloseSession(request *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
	Initialize() error
}

func NewSessionManager

func NewSessionManager(ctx context.Context, namespace string, shardId int64, controller *leaderController) SessionManager

type ShardAssignmentsDispatcher

type ShardAssignmentsDispatcher interface {
	io.Closer
	Initialized() bool
	PushShardAssignments(stream proto.OxiaCoordination_PushShardAssignmentsServer) error
	RegisterForUpdates(req *proto.ShardAssignmentsRequest, client Client) error
}

func NewShardAssignmentDispatcher

func NewShardAssignmentDispatcher(healthServer *health.Server) ShardAssignmentsDispatcher

func NewStandaloneShardAssignmentDispatcher

func NewStandaloneShardAssignmentDispatcher(numShards uint32) ShardAssignmentsDispatcher

type ShardsDirector

type ShardsDirector interface {
	io.Closer

	GetLeader(shardId int64) (LeaderController, error)
	GetFollower(shardId int64) (FollowerController, error)

	GetOrCreateLeader(namespace string, shardId int64) (LeaderController, error)
	GetOrCreateFollower(namespace string, shardId int64, term int64) (FollowerController, error)

	DeleteShard(req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
}

func NewShardsDirector

func NewShardsDirector(config Config, walFactory wal.Factory, kvFactory kv.Factory, provider ReplicationRpcProvider) ShardsDirector

type Standalone

type Standalone struct {
	// contains filtered or unexported fields
}

func NewStandalone

func NewStandalone(config StandaloneConfig) (*Standalone, error)

func (*Standalone) Close

func (s *Standalone) Close() error

func (*Standalone) RpcPort

func (s *Standalone) RpcPort() int

type StandaloneConfig

type StandaloneConfig struct {
	Config

	NumShards            uint32
	NotificationsEnabled bool
}

func NewTestConfig

func NewTestConfig(dir string) StandaloneConfig

Directories

Path Synopsis
crc
wal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL