Documentation ¶
Index ¶
- Variables
- func ReadHeaderInt64(md metadata.MD, key string) (v int64, err error)
- func SessionKey(sessionId SessionId) string
- func ShadowKey(sessionId SessionId, key string) string
- type Client
- type Config
- type CursorAcker
- type FollowerController
- type FollowerCursor
- type GetResult
- type LeaderController
- type MessageWithTerm
- type QuorumAckTracker
- type ReplicateStreamProvider
- type ReplicationRpcProvider
- type Server
- func (s Server) AddFollower(c context.Context, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
- func (s Server) BecomeLeader(c context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
- func (s *Server) Close() error
- func (s Server) CloseSession(ctx context.Context, req *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
- func (s Server) CreateSession(ctx context.Context, req *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
- func (s Server) DeleteShard(_ context.Context, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
- func (s Server) GetNotifications(req *proto.NotificationsRequest, ...) error
- func (s Server) GetShardAssignments(req *proto.ShardAssignmentsRequest, ...) error
- func (s Server) GetStatus(_ context.Context, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
- func (s *Server) InternalPort() int
- func (s Server) KeepAlive(ctx context.Context, req *proto.SessionHeartbeat) (*proto.KeepAliveResponse, error)
- func (s Server) List(request *proto.ListRequest, stream proto.OxiaClient_ListServer) error
- func (s Server) NewTerm(c context.Context, req *proto.NewTermRequest) (*proto.NewTermResponse, error)
- func (s Server) Port() int
- func (s *Server) PublicPort() int
- func (s Server) PushShardAssignments(srv proto.OxiaCoordination_PushShardAssignmentsServer) error
- func (s Server) RangeScan(request *proto.RangeScanRequest, stream proto.OxiaClient_RangeScanServer) error
- func (s Server) Read(request *proto.ReadRequest, stream proto.OxiaClient_ReadServer) error
- func (s Server) Replicate(srv proto.OxiaLogReplication_ReplicateServer) error
- func (s Server) SendSnapshot(srv proto.OxiaLogReplication_SendSnapshotServer) error
- func (s Server) Truncate(c context.Context, req *proto.TruncateRequest) (*proto.TruncateResponse, error)
- func (s Server) Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)
- func (s Server) WriteStream(stream proto.OxiaClient_WriteStreamServer) error
- type SessionId
- type SessionManager
- type ShardAssignmentsDispatcher
- type ShardsDirector
- type Standalone
- type StandaloneConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrTooManyCursors = errors.New("too many cursors") ErrInvalidHeadOffset = errors.New("invalid head offset") )
View Source
var InvalidEntryId = &proto.EntryId{ Term: wal.InvalidTerm, Offset: wal.InvalidOffset, }
View Source
var WrapperUpdateOperationCallback kv.UpdateOperationCallback = &wrapperUpdateCallback{}
Functions ¶
func SessionKey ¶
Types ¶
type Client ¶
type Client interface { Send(*proto.ShardAssignments) error Context() context.Context }
type Config ¶
type Config struct { PublicServiceAddr string InternalServiceAddr string PeerTLS *tls.Config ServerTLS *tls.Config InternalServerTLS *tls.Config MetricsServiceAddr string AuthOptions auth.Options DataDir string WalDir string WalRetentionTime time.Duration WalSyncData bool NotificationsRetentionTime time.Duration DbBlockCacheMB int64 }
type CursorAcker ¶
type CursorAcker interface {
Ack(offset int64)
}
type FollowerController ¶
type FollowerController interface { io.Closer // NewTerm // // Node handles a new term request // // A node receives a new term request, fences itself and responds // with its head offset. // // When a node is fenced it cannot: // - accept any writes from a client. // - accept append from a leader. // - send any entries to followers if it was a leader. // // Any existing follow cursors are destroyed as is any state // regarding reconfigurations. NewTerm(req *proto.NewTermRequest) (*proto.NewTermResponse, error) // Truncate // // A node that receives a truncate request knows that it // has been selected as a follower. It truncates its log // to the indicates entry id, updates its term and changes // to a Follower. Truncate(req *proto.TruncateRequest) (*proto.TruncateResponse, error) Replicate(stream proto.OxiaLogReplication_ReplicateServer) error SendSnapshot(stream proto.OxiaLogReplication_SendSnapshotServer) error GetStatus(request *proto.GetStatusRequest) (*proto.GetStatusResponse, error) DeleteShard(request *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error) Term() int64 CommitOffset() int64 Status() proto.ServingStatus }
FollowerController handles all the operations of a given shard's follower.
type FollowerCursor ¶
type FollowerCursor interface { io.Closer ShardId() int64 // LastPushed // The last entry that was sent to this follower LastPushed() int64 // AckOffset The highest entry already acknowledged by this follower AckOffset() int64 }
FollowerCursor The FollowerCursor represents a cursor on the leader WAL that sends entries to a specific follower and receives a stream of acknowledgments from that follower.
func NewFollowerCursor ¶
func NewFollowerCursor( follower string, term int64, namespace string, shardId int64, replicateStreamProvider ReplicateStreamProvider, ackTracker QuorumAckTracker, walObject wal.Wal, db kv.DB, ackOffset int64) (FollowerCursor, error)
type GetResult ¶
type GetResult struct { Response *proto.GetResponse Err error }
type LeaderController ¶
type LeaderController interface { io.Closer Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error) WriteStream(stream proto.OxiaClient_WriteStreamServer) error Read(ctx context.Context, request *proto.ReadRequest) <-chan GetResult List(ctx context.Context, request *proto.ListRequest) (<-chan string, error) ListSliceNoMutex(ctx context.Context, request *proto.ListRequest) ([]string, error) RangeScan(ctx context.Context, request *proto.RangeScanRequest) (<-chan *proto.GetResponse, <-chan error, error) // NewTerm Handle new term requests NewTerm(req *proto.NewTermRequest) (*proto.NewTermResponse, error) // BecomeLeader Handles BecomeLeaderRequest from coordinator and prepares to be leader for the shard BecomeLeader(ctx context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error) AddFollower(request *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error) GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error GetStatus(request *proto.GetStatusRequest) (*proto.GetStatusResponse, error) DeleteShard(request *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error) // Term The current term of the leader Term() int64 // Status The Status of the leader Status() proto.ServingStatus CreateSession(*proto.CreateSessionRequest) (*proto.CreateSessionResponse, error) KeepAlive(sessionId int64) error CloseSession(*proto.CloseSessionRequest) (*proto.CloseSessionResponse, error) }
func NewLeaderController ¶
func NewLeaderController(config Config, namespace string, shardId int64, rpcClient ReplicationRpcProvider, walFactory wal.Factory, kvFactory kv.Factory) (LeaderController, error)
type MessageWithTerm ¶
type MessageWithTerm interface {
GetTerm() int64
}
type QuorumAckTracker ¶
type QuorumAckTracker interface { io.Closer CommitOffset() int64 // WaitForCommitOffset // Waits for the specific entry id to be fully committed. // After that, invokes the function f WaitForCommitOffset(ctx context.Context, offset int64, f func() (*proto.WriteResponse, error)) (*proto.WriteResponse, error) WaitForCommitOffsetAsync(offset int64, f func() (*proto.WriteResponse, error), callback func(*proto.WriteResponse, error)) // NextOffset returns the offset for the next entry to write // Note this can go ahead of the head-offset as there can be multiple operations in flight. NextOffset() int64 HeadOffset() int64 AdvanceHeadOffset(headOffset int64) // WaitForHeadOffset // Waits until the specified entry is written on the wal WaitForHeadOffset(ctx context.Context, offset int64) error // NewCursorAcker creates a tracker for a new cursor // The `ackOffset` is the previous last-acked position for the cursor NewCursorAcker(ackOffset int64) (CursorAcker, error) }
QuorumAckTracker The QuorumAckTracker is responsible for keeping track of the head offset and commit offset of a shard
- Head offset: the last entry written in the local WAL of the leader
- Commit offset: the oldest entry that is considered "fully committed", as it has received the requested amount of acks from the followers
The quorum ack tracker is also used to block until the head offset or commit offset are advanced.
func NewQuorumAckTracker ¶
func NewQuorumAckTracker(replicationFactor uint32, headOffset int64, commitOffset int64) QuorumAckTracker
type ReplicateStreamProvider ¶
type ReplicateStreamProvider interface { GetReplicateStream(ctx context.Context, follower string, namespace string, shard int64, term int64) (proto.OxiaLogReplication_ReplicateClient, error) SendSnapshot(ctx context.Context, follower string, namespace string, shard int64, term int64) (proto.OxiaLogReplication_SendSnapshotClient, error) }
ReplicateStreamProvider This is a provider for the ReplicateStream Grpc handler It's used to allow passing in a mocked version of the Grpc service.
type ReplicationRpcProvider ¶
type ReplicationRpcProvider interface { io.Closer ReplicateStreamProvider Truncate(follower string, req *proto.TruncateRequest) (*proto.TruncateResponse, error) }
func NewReplicationRpcProvider ¶
func NewReplicationRpcProvider(tlsConf *tls.Config) ReplicationRpcProvider
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewWithGrpcProvider ¶
func NewWithGrpcProvider(config Config, provider container.GrpcProvider, replicationRpcProvider ReplicationRpcProvider) (*Server, error)
func (Server) AddFollower ¶
func (s Server) AddFollower(c context.Context, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
func (Server) BecomeLeader ¶
func (s Server) BecomeLeader(c context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
func (Server) CloseSession ¶
func (s Server) CloseSession(ctx context.Context, req *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
func (Server) CreateSession ¶
func (s Server) CreateSession(ctx context.Context, req *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
func (Server) DeleteShard ¶
func (s Server) DeleteShard(_ context.Context, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
func (Server) GetNotifications ¶
func (s Server) GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error
func (Server) GetShardAssignments ¶
func (s Server) GetShardAssignments(req *proto.ShardAssignmentsRequest, srv proto.OxiaClient_GetShardAssignmentsServer) error
func (Server) GetStatus ¶
func (s Server) GetStatus(_ context.Context, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
func (*Server) InternalPort ¶
func (Server) KeepAlive ¶
func (s Server) KeepAlive(ctx context.Context, req *proto.SessionHeartbeat) (*proto.KeepAliveResponse, error)
func (Server) List ¶
func (s Server) List(request *proto.ListRequest, stream proto.OxiaClient_ListServer) error
func (Server) NewTerm ¶
func (s Server) NewTerm(c context.Context, req *proto.NewTermRequest) (*proto.NewTermResponse, error)
func (*Server) PublicPort ¶
func (Server) PushShardAssignments ¶
func (s Server) PushShardAssignments(srv proto.OxiaCoordination_PushShardAssignmentsServer) error
func (Server) RangeScan ¶ added in v0.5.0
func (s Server) RangeScan(request *proto.RangeScanRequest, stream proto.OxiaClient_RangeScanServer) error
func (Server) Read ¶
func (s Server) Read(request *proto.ReadRequest, stream proto.OxiaClient_ReadServer) error
func (Server) Replicate ¶
func (s Server) Replicate(srv proto.OxiaLogReplication_ReplicateServer) error
func (Server) SendSnapshot ¶
func (s Server) SendSnapshot(srv proto.OxiaLogReplication_SendSnapshotServer) error
func (Server) Truncate ¶
func (s Server) Truncate(c context.Context, req *proto.TruncateRequest) (*proto.TruncateResponse, error)
func (Server) Write ¶
func (s Server) Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)
func (Server) WriteStream ¶ added in v0.7.0
func (s Server) WriteStream(stream proto.OxiaClient_WriteStreamServer) error
type SessionManager ¶
type SessionManager interface { io.Closer CreateSession(request *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error) KeepAlive(sessionId int64) error CloseSession(request *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error) Initialize() error }
func NewSessionManager ¶
func NewSessionManager(ctx context.Context, namespace string, shardId int64, controller *leaderController) SessionManager
type ShardAssignmentsDispatcher ¶
type ShardAssignmentsDispatcher interface { io.Closer Initialized() bool PushShardAssignments(stream proto.OxiaCoordination_PushShardAssignmentsServer) error RegisterForUpdates(req *proto.ShardAssignmentsRequest, client Client) error }
func NewShardAssignmentDispatcher ¶
func NewShardAssignmentDispatcher(healthServer *health.Server) ShardAssignmentsDispatcher
func NewStandaloneShardAssignmentDispatcher ¶
func NewStandaloneShardAssignmentDispatcher(numShards uint32) ShardAssignmentsDispatcher
type ShardsDirector ¶
type ShardsDirector interface { io.Closer GetLeader(shardId int64) (LeaderController, error) GetFollower(shardId int64) (FollowerController, error) GetOrCreateLeader(namespace string, shardId int64) (LeaderController, error) GetOrCreateFollower(namespace string, shardId int64, term int64) (FollowerController, error) DeleteShard(req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error) }
func NewShardsDirector ¶
func NewShardsDirector(config Config, walFactory wal.Factory, kvFactory kv.Factory, provider ReplicationRpcProvider) ShardsDirector
type Standalone ¶
type Standalone struct {
// contains filtered or unexported fields
}
func NewStandalone ¶
func NewStandalone(config StandaloneConfig) (*Standalone, error)
func (*Standalone) Close ¶
func (s *Standalone) Close() error
func (*Standalone) RpcPort ¶
func (s *Standalone) RpcPort() int
type StandaloneConfig ¶
func NewTestConfig ¶
func NewTestConfig(dir string) StandaloneConfig
Source Files ¶
- assignment_dispatcher.go
- constants.go
- follower_controller.go
- follower_cursor.go
- internal_rpc_server.go
- leader_controller.go
- notifications_dispatcher.go
- public_rpc_server.go
- quorum_ack_tracker.go
- rpc_provider.go
- secondary_indexes.go
- server.go
- session.go
- session_manager.go
- shards_director.go
- standalone.go
Click to show internal directories.
Click to hide internal directories.