Documentation ¶
Index ¶
- Constants
- Variables
- func NewReportCollector(helper ReportCollectorHelper, rpcTimeout time.Duration, tmStub *telemetryStub, ...) *reportCollector
- func NewReporterClientFactory(grpcDialOptions ...grpc.DialOption) *reporterClientFactory
- type DummyStorageNodeClient
- func (r *DummyStorageNodeClient) AddLogStreamReplica(context.Context, types.TopicID, types.LogStreamID, string) (snpb.LogStreamReplicaMetadataDescriptor, error)
- func (r *DummyStorageNodeClient) Close() error
- func (r *DummyStorageNodeClient) Commit(cr snpb.CommitRequest) error
- func (r *DummyStorageNodeClient) CommitBatch(cbr snpb.CommitBatchRequest) error
- func (r *DummyStorageNodeClient) DisableCommit()
- func (r *DummyStorageNodeClient) DisableReport()
- func (r *DummyStorageNodeClient) EnableCommmit()
- func (r *DummyStorageNodeClient) EnableReport()
- func (r *DummyStorageNodeClient) GetMetadata(context.Context) (*snpb.StorageNodeMetadataDescriptor, error)
- func (r *DummyStorageNodeClient) GetReport() (*snpb.GetReportResponse, error)
- func (r *DummyStorageNodeClient) RemoveLogStream(context.Context, types.TopicID, types.LogStreamID) error
- func (r *DummyStorageNodeClient) Seal(context.Context, types.TopicID, types.LogStreamID, types.GLSN) (varlogpb.LogStreamStatus, types.GLSN, error)
- func (r *DummyStorageNodeClient) SetCommitDelay(d time.Duration)
- func (r *DummyStorageNodeClient) SetReportDelay(d time.Duration)
- func (r *DummyStorageNodeClient) Sync(context.Context, types.TopicID, types.LogStreamID, types.StorageNodeID, string, ...) (*snpb.SyncStatus, error)
- func (r *DummyStorageNodeClient) Target() varlogpb.StorageNode
- func (r *DummyStorageNodeClient) Trim(context.Context, types.TopicID, types.GLSN) (map[types.LogStreamID]error, error)
- func (r *DummyStorageNodeClient) Unseal(context.Context, types.TopicID, types.LogStreamID, []varlogpb.LogStreamReplica) error
- type DummyStorageNodeClientFactory
- func (fac *DummyStorageNodeClientFactory) GetManagementClient(ctx context.Context, _ types.ClusterID, address string, _ *zap.Logger) (client.StorageNodeManagementClient, error)
- func (fac *DummyStorageNodeClientFactory) GetReporterClient(ctx context.Context, sn *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)
- type DummyStorageNodeClientStatus
- type EmptyStorageNodeClient
- func (rc *EmptyStorageNodeClient) AddLogStreamReplica(context.Context, types.TopicID, types.LogStreamID, string) (snpb.LogStreamReplicaMetadataDescriptor, error)
- func (rc *EmptyStorageNodeClient) Close() error
- func (rc *EmptyStorageNodeClient) Commit(snpb.CommitRequest) error
- func (rc *EmptyStorageNodeClient) CommitBatch(snpb.CommitBatchRequest) error
- func (rc *EmptyStorageNodeClient) GetMetadata(context.Context) (*snpb.StorageNodeMetadataDescriptor, error)
- func (rc *EmptyStorageNodeClient) GetReport() (*snpb.GetReportResponse, error)
- func (rc *EmptyStorageNodeClient) RemoveLogStream(context.Context, types.TopicID, types.LogStreamID) error
- func (rc *EmptyStorageNodeClient) Seal(context.Context, types.TopicID, types.LogStreamID, types.GLSN) (varlogpb.LogStreamStatus, types.GLSN, error)
- func (rc *EmptyStorageNodeClient) Sync(context.Context, types.TopicID, types.LogStreamID, types.StorageNodeID, string, ...) (*snpb.SyncStatus, error)
- func (rc *EmptyStorageNodeClient) Target() varlogpb.StorageNode
- func (rc *EmptyStorageNodeClient) Trim(context.Context, types.TopicID, types.GLSN) (map[types.LogStreamID]error, error)
- func (rc *EmptyStorageNodeClient) Unseal(context.Context, types.TopicID, types.LogStreamID, []varlogpb.LogStreamReplica) error
- type EmptyStorageNodeClientFactory
- func (rcf *EmptyStorageNodeClientFactory) GetManagementClient(context.Context, types.ClusterID, string, *zap.Logger) (client.StorageNodeManagementClient, error)
- func (rcf *EmptyStorageNodeClientFactory) GetReporterClient(context.Context, *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)
- type Management
- type ManagementService
- func (s *ManagementService) AddPeer(ctx context.Context, req *mrpb.AddPeerRequest) (*types.Empty, error)
- func (s *ManagementService) GetClusterInfo(ctx context.Context, req *mrpb.GetClusterInfoRequest) (*mrpb.GetClusterInfoResponse, error)
- func (s *ManagementService) Register(server *grpc.Server)
- func (s *ManagementService) RemovePeer(ctx context.Context, req *mrpb.RemovePeerRequest) (*types.Empty, error)
- type Membership
- type MetadataRepository
- type MetadataRepositoryService
- func (s *MetadataRepositoryService) GetMetadata(ctx context.Context, req *mrpb.GetMetadataRequest) (*mrpb.GetMetadataResponse, error)
- func (s *MetadataRepositoryService) Register(server *grpc.Server)
- func (s *MetadataRepositoryService) RegisterLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)
- func (s *MetadataRepositoryService) RegisterStorageNode(ctx context.Context, req *mrpb.StorageNodeRequest) (*types.Empty, error)
- func (s *MetadataRepositoryService) RegisterTopic(ctx context.Context, req *mrpb.TopicRequest) (*types.Empty, error)
- func (s *MetadataRepositoryService) Seal(ctx context.Context, req *mrpb.SealRequest) (*mrpb.SealResponse, error)
- func (s *MetadataRepositoryService) UnregisterLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)
- func (s *MetadataRepositoryService) UnregisterStorageNode(ctx context.Context, req *mrpb.StorageNodeRequest) (*types.Empty, error)
- func (s *MetadataRepositoryService) UnregisterTopic(ctx context.Context, req *mrpb.TopicRequest) (*types.Empty, error)
- func (s *MetadataRepositoryService) Unseal(ctx context.Context, req *mrpb.UnsealRequest) (*mrpb.UnsealResponse, error)
- func (s *MetadataRepositoryService) UpdateLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)
- type MetadataStorage
- func (ms *MetadataStorage) AddPeer(nodeID types.NodeID, url string, isLearner bool, cs *raftpb.ConfState, ...) error
- func (ms *MetadataStorage) AppendLogStreamCommitHistory(cr *mrpb.LogStreamCommitResults)
- func (ms *MetadataStorage) ApplySnapshot(snap []byte, snapConfState *raftpb.ConfState, snapIndex uint64) error
- func (ms *MetadataStorage) Clear()
- func (ms *MetadataStorage) Close()
- func (ms *MetadataStorage) GetFirstCommitResults() *mrpb.LogStreamCommitResults
- func (ms *MetadataStorage) GetLastCommitResults() *mrpb.LogStreamCommitResults
- func (ms *MetadataStorage) GetLastCommitVersion() types.Version
- func (ms *MetadataStorage) GetLogStreamCommitResults() []*mrpb.LogStreamCommitResults
- func (ms *MetadataStorage) GetLogStreams() []*varlogpb.LogStreamDescriptor
- func (ms *MetadataStorage) GetMetadata() *varlogpb.MetadataDescriptor
- func (ms *MetadataStorage) GetMinCommitVersion() types.Version
- func (ms *MetadataStorage) GetPeers() *mrpb.MetadataRepositoryDescriptor_PeerDescriptorMap
- func (ms *MetadataStorage) GetSnapshot() ([]byte, *raftpb.ConfState, uint64)
- func (ms *MetadataStorage) GetSnapshotIndex() uint64
- func (ms *MetadataStorage) GetSortedTopicLogStreamIDs() []TopicLSID
- func (ms *MetadataStorage) GetStorageNodes() []*varlogpb.StorageNodeDescriptor
- func (ms *MetadataStorage) IsLearner(nodeID types.NodeID) bool
- func (ms *MetadataStorage) IsMember(nodeID types.NodeID) bool
- func (ms *MetadataStorage) Leader() types.NodeID
- func (ms *MetadataStorage) LookupEndpoint(nodeID types.NodeID) string
- func (ms *MetadataStorage) LookupLogStream(lsID types.LogStreamID) *varlogpb.LogStreamDescriptor
- func (ms *MetadataStorage) LookupNextCommitResults(ver types.Version) (*mrpb.LogStreamCommitResults, error)
- func (ms *MetadataStorage) LookupStorageNode(snID types.StorageNodeID) *varlogpb.StorageNodeDescriptor
- func (ms *MetadataStorage) LookupUncommitReport(lsID types.LogStreamID, snID types.StorageNodeID) (snpb.LogStreamUncommitReport, bool)
- func (ms *MetadataStorage) LookupUncommitReports(lsID types.LogStreamID) *mrpb.LogStreamUncommitReports
- func (ms *MetadataStorage) NumUpdateSinceCommit() uint64
- func (ms *MetadataStorage) RecoverStateMachine(stateMachine *mrpb.MetadataRepositoryDescriptor, ...) error
- func (ms *MetadataStorage) RegisterEndpoint(nodeID types.NodeID, url string, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) RegisterLogStream(ls *varlogpb.LogStreamDescriptor, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) RegisterStorageNode(sn *varlogpb.StorageNodeDescriptor, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) RegisterTopic(topic *varlogpb.TopicDescriptor, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) RemovePeer(nodeID types.NodeID, cs *raftpb.ConfState, appliedIndex uint64) error
- func (ms *MetadataStorage) ResetUpdateSinceCommit()
- func (ms *MetadataStorage) Run()
- func (ms *MetadataStorage) SealLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) SealingLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) SetLeader(nodeID types.NodeID)
- func (ms *MetadataStorage) TrimLogStreamCommitHistory(ver types.Version) error
- func (ms *MetadataStorage) UnregisterLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) UnregisterStorageNode(snID types.StorageNodeID, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) UnregisterTopic(topicID types.TopicID, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) UnsealLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) UpdateAppliedIndex(appliedIndex uint64)
- func (ms *MetadataStorage) UpdateLogStream(ls *varlogpb.LogStreamDescriptor, nodeIndex, requestIndex uint64) error
- func (ms *MetadataStorage) UpdateUncommitReport(lsID types.LogStreamID, snID types.StorageNodeID, ...)
- type Option
- func JoinCluster() Option
- func WithClusterID(cid types.ClusterID) Option
- func WithCommitTick(commitTick time.Duration) Option
- func WithDebugAddress(debugAddr string) Option
- func WithLogger(logger *zap.Logger) Option
- func WithMaxLogStreamsCountPerTopic(maxLogStreamsCountPerTopic int32) Option
- func WithMaxSnapPurgeCount(maxSnapPurgeCount uint) Option
- func WithMaxTopicsCount(maxTopicsCount int32) Option
- func WithMaxWALPurgeCount(maxWalPurgeCount uint) Option
- func WithPeers(peers ...string) Option
- func WithRPCAddress(rpcAddr string) Option
- func WithRPCTimeout(rpcTimeout time.Duration) Option
- func WithRaftAddress(raftAddr string) Option
- func WithRaftDirectory(raftDir string) Option
- func WithRaftTick(raftTick time.Duration) Option
- func WithReplicationFactor(replicationFactor int) Option
- func WithReportCommitterReadBufferSize(readBufferSize int) Option
- func WithReportCommitterWriteBufferSize(writeBufferSize int) Option
- func WithReporterClientFactory(reporterClientFac ReporterClientFactory) Option
- func WithSnapshotCatchUpCount(snapCatchUpCount uint64) Option
- func WithSnapshotCount(snapshotCount uint64) Option
- func WithTelemetryCollectorEndpoint(telemetryCollectorEndpoint string) Option
- func WithTelemetryCollectorName(telemetryCollectorName string) Option
- type RaftLogger
- type RaftMetadataRepository
- func (mr *RaftMetadataRepository) AddPeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID, url string) error
- func (mr *RaftMetadataRepository) Close() error
- func (mr *RaftMetadataRepository) GetClusterInfo(context.Context, types.ClusterID) (*mrpb.ClusterInfo, error)
- func (mr *RaftMetadataRepository) GetLastCommitResults() *mrpb.LogStreamCommitResults
- func (mr *RaftMetadataRepository) GetLastCommitVersion() types.Version
- func (mr *RaftMetadataRepository) GetMetadata(context.Context) (*varlogpb.MetadataDescriptor, error)
- func (mr *RaftMetadataRepository) GetOldestCommitVersion() types.Version
- func (mr *RaftMetadataRepository) GetReportCount() uint64
- func (mr *RaftMetadataRepository) GetReporterClient(ctx context.Context, sn *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)
- func (mr *RaftMetadataRepository) GetServerAddr() string
- func (mr *RaftMetadataRepository) IsLearner() bool
- func (mr *RaftMetadataRepository) IsMember() bool
- func (mr *RaftMetadataRepository) LookupNextCommitResults(ver types.Version) (*mrpb.LogStreamCommitResults, error)
- func (mr *RaftMetadataRepository) ProposeReport(snID types.StorageNodeID, ur []snpb.LogStreamUncommitReport) error
- func (mr *RaftMetadataRepository) RegisterLogStream(ctx context.Context, ls *varlogpb.LogStreamDescriptor) error
- func (mr *RaftMetadataRepository) RegisterStorageNode(ctx context.Context, sn *varlogpb.StorageNodeDescriptor) error
- func (mr *RaftMetadataRepository) RegisterTopic(ctx context.Context, topicID types.TopicID) error
- func (mr *RaftMetadataRepository) RemovePeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID) error
- func (mr *RaftMetadataRepository) Run()
- func (mr *RaftMetadataRepository) Seal(ctx context.Context, lsID types.LogStreamID) (types.GLSN, error)
- func (mr *RaftMetadataRepository) UnregisterLogStream(ctx context.Context, lsID types.LogStreamID) error
- func (mr *RaftMetadataRepository) UnregisterStorageNode(ctx context.Context, snID types.StorageNodeID) error
- func (mr *RaftMetadataRepository) UnregisterTopic(ctx context.Context, topicID types.TopicID) error
- func (mr *RaftMetadataRepository) Unseal(ctx context.Context, lsID types.LogStreamID) error
- func (mr *RaftMetadataRepository) UpdateLogStream(ctx context.Context, ls *varlogpb.LogStreamDescriptor) error
- func (mr *RaftMetadataRepository) Wait()
- type ReportCollector
- type ReportCollectorHelper
- type ReporterClientFactory
- type SnapshotGetter
- type TopicLSID
Constants ¶
View Source
const ( DefaultClusterID = types.ClusterID(1) DefaultRPCBindAddress = "0.0.0.0:9092" DefaultDebugAddress = "0.0.0.0:9099" DefaultRaftPort = 10000 DefaultSnapshotCount uint64 = 10000 DefaultSnapshotCatchUpCount uint64 = 10000 DefaultSnapshotPurgeCount uint = 10 DefaultWalPurgeCount uint = 10 DefaultLogReplicationFactor int = 1 DefaultProposeTimeout = 100 * time.Millisecond DefaultRaftTick = 100 * time.Millisecond DefaultRPCTimeout = 100 * time.Millisecond DefaultCommitTick = 1 * time.Millisecond DefaultPromoteTick = 100 * time.Millisecond DefaultRaftDir string = "raftdata" DefaultLogDir string = "log" DefaultTelemetryCollectorName string = "nop" DefaultTelmetryCollectorEndpoint string = "localhost:55680" DefaultReportCommitterReadBufferSize = 32 * 1024 // 32KB DefaultReportCommitterWriteBufferSize = 32 * 1024 // 32KB DefaultMaxTopicsCount = -1 DefaultMaxLogStreamsCountPerTopic = -1 UnusedRequestIndex uint64 = 0 )
View Source
const DefaultCatchupRefreshTime = 3 * time.Millisecond
View Source
const DefaultDelay = 500 * time.Microsecond
View Source
const DefaultReportRefreshTime = time.Second
View Source
const DefaultSampleReportsRate = 1000
View Source
const (
PromoteRate = 0.9
)
Variables ¶
View Source
var (
DefaultRaftAddress = makeDefaultRaftAddress()
)
Functions ¶
func NewReportCollector ¶
func NewReportCollector(helper ReportCollectorHelper, rpcTimeout time.Duration, tmStub *telemetryStub, logger *zap.Logger) *reportCollector
func NewReporterClientFactory ¶
func NewReporterClientFactory(grpcDialOptions ...grpc.DialOption) *reporterClientFactory
Types ¶
type DummyStorageNodeClient ¶
type DummyStorageNodeClient struct {
// contains filtered or unexported fields
}
func (*DummyStorageNodeClient) AddLogStreamReplica ¶
func (r *DummyStorageNodeClient) AddLogStreamReplica(context.Context, types.TopicID, types.LogStreamID, string) (snpb.LogStreamReplicaMetadataDescriptor, error)
func (*DummyStorageNodeClient) Close ¶
func (r *DummyStorageNodeClient) Close() error
func (*DummyStorageNodeClient) Commit ¶
func (r *DummyStorageNodeClient) Commit(cr snpb.CommitRequest) error
func (*DummyStorageNodeClient) CommitBatch ¶ added in v0.7.0
func (r *DummyStorageNodeClient) CommitBatch(cbr snpb.CommitBatchRequest) error
func (*DummyStorageNodeClient) DisableCommit ¶ added in v0.10.0
func (r *DummyStorageNodeClient) DisableCommit()
func (*DummyStorageNodeClient) DisableReport ¶
func (r *DummyStorageNodeClient) DisableReport()
func (*DummyStorageNodeClient) EnableCommmit ¶ added in v0.10.0
func (r *DummyStorageNodeClient) EnableCommmit()
func (*DummyStorageNodeClient) EnableReport ¶
func (r *DummyStorageNodeClient) EnableReport()
func (*DummyStorageNodeClient) GetMetadata ¶
func (r *DummyStorageNodeClient) GetMetadata(context.Context) (*snpb.StorageNodeMetadataDescriptor, error)
func (*DummyStorageNodeClient) GetReport ¶
func (r *DummyStorageNodeClient) GetReport() (*snpb.GetReportResponse, error)
func (*DummyStorageNodeClient) RemoveLogStream ¶
func (r *DummyStorageNodeClient) RemoveLogStream(context.Context, types.TopicID, types.LogStreamID) error
func (*DummyStorageNodeClient) Seal ¶
func (r *DummyStorageNodeClient) Seal(context.Context, types.TopicID, types.LogStreamID, types.GLSN) (varlogpb.LogStreamStatus, types.GLSN, error)
func (*DummyStorageNodeClient) SetCommitDelay ¶
func (r *DummyStorageNodeClient) SetCommitDelay(d time.Duration)
func (*DummyStorageNodeClient) SetReportDelay ¶
func (r *DummyStorageNodeClient) SetReportDelay(d time.Duration)
func (*DummyStorageNodeClient) Sync ¶
func (r *DummyStorageNodeClient) Sync(context.Context, types.TopicID, types.LogStreamID, types.StorageNodeID, string, types.GLSN) (*snpb.SyncStatus, error)
func (*DummyStorageNodeClient) Target ¶
func (r *DummyStorageNodeClient) Target() varlogpb.StorageNode
func (*DummyStorageNodeClient) Unseal ¶
func (r *DummyStorageNodeClient) Unseal(context.Context, types.TopicID, types.LogStreamID, []varlogpb.LogStreamReplica) error
type DummyStorageNodeClientFactory ¶
type DummyStorageNodeClientFactory struct {
// contains filtered or unexported fields
}
func NewDummyStorageNodeClientFactory ¶
func NewDummyStorageNodeClientFactory(nrLogStreams int, manual bool) *DummyStorageNodeClientFactory
func (*DummyStorageNodeClientFactory) GetManagementClient ¶
func (*DummyStorageNodeClientFactory) GetReporterClient ¶
func (fac *DummyStorageNodeClientFactory) GetReporterClient(ctx context.Context, sn *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)
type DummyStorageNodeClientStatus ¶
type DummyStorageNodeClientStatus int32
const ( DummyStorageNodeClientStatusRunning DummyStorageNodeClientStatus = iota DummyStorageNodeClientStatusClosed DummyStorageNodeClientStatusCrash )
type EmptyStorageNodeClient ¶
type EmptyStorageNodeClient struct { }
func (*EmptyStorageNodeClient) AddLogStreamReplica ¶
func (rc *EmptyStorageNodeClient) AddLogStreamReplica(context.Context, types.TopicID, types.LogStreamID, string) (snpb.LogStreamReplicaMetadataDescriptor, error)
func (*EmptyStorageNodeClient) Close ¶
func (rc *EmptyStorageNodeClient) Close() error
func (*EmptyStorageNodeClient) Commit ¶
func (rc *EmptyStorageNodeClient) Commit(snpb.CommitRequest) error
func (*EmptyStorageNodeClient) CommitBatch ¶ added in v0.7.0
func (rc *EmptyStorageNodeClient) CommitBatch(snpb.CommitBatchRequest) error
func (*EmptyStorageNodeClient) GetMetadata ¶
func (rc *EmptyStorageNodeClient) GetMetadata(context.Context) (*snpb.StorageNodeMetadataDescriptor, error)
func (*EmptyStorageNodeClient) GetReport ¶
func (rc *EmptyStorageNodeClient) GetReport() (*snpb.GetReportResponse, error)
func (*EmptyStorageNodeClient) RemoveLogStream ¶
func (rc *EmptyStorageNodeClient) RemoveLogStream(context.Context, types.TopicID, types.LogStreamID) error
func (*EmptyStorageNodeClient) Seal ¶
func (rc *EmptyStorageNodeClient) Seal(context.Context, types.TopicID, types.LogStreamID, types.GLSN) (varlogpb.LogStreamStatus, types.GLSN, error)
func (*EmptyStorageNodeClient) Sync ¶
func (rc *EmptyStorageNodeClient) Sync(context.Context, types.TopicID, types.LogStreamID, types.StorageNodeID, string, types.GLSN) (*snpb.SyncStatus, error)
func (*EmptyStorageNodeClient) Target ¶
func (rc *EmptyStorageNodeClient) Target() varlogpb.StorageNode
func (*EmptyStorageNodeClient) Unseal ¶
func (rc *EmptyStorageNodeClient) Unseal(context.Context, types.TopicID, types.LogStreamID, []varlogpb.LogStreamReplica) error
type EmptyStorageNodeClientFactory ¶
type EmptyStorageNodeClientFactory struct { }
func NewEmptyStorageNodeClientFactory ¶
func NewEmptyStorageNodeClientFactory() *EmptyStorageNodeClientFactory
func (*EmptyStorageNodeClientFactory) GetManagementClient ¶
func (*EmptyStorageNodeClientFactory) GetReporterClient ¶
func (rcf *EmptyStorageNodeClientFactory) GetReporterClient(context.Context, *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)
type Management ¶
type ManagementService ¶
type ManagementService struct {
// contains filtered or unexported fields
}
func NewManagementService ¶
func NewManagementService(m Management) *ManagementService
func (*ManagementService) AddPeer ¶
func (s *ManagementService) AddPeer(ctx context.Context, req *mrpb.AddPeerRequest) (*types.Empty, error)
func (*ManagementService) GetClusterInfo ¶
func (s *ManagementService) GetClusterInfo(ctx context.Context, req *mrpb.GetClusterInfoRequest) (*mrpb.GetClusterInfoResponse, error)
func (*ManagementService) Register ¶
func (s *ManagementService) Register(server *grpc.Server)
func (*ManagementService) RemovePeer ¶
func (s *ManagementService) RemovePeer(ctx context.Context, req *mrpb.RemovePeerRequest) (*types.Empty, error)
type Membership ¶
type Membership interface { SetLeader(types.NodeID) Leader() types.NodeID AddPeer(types.NodeID, string, bool, *raftpb.ConfState, uint64) error RemovePeer(types.NodeID, *raftpb.ConfState, uint64) error GetPeers() *mrpb.MetadataRepositoryDescriptor_PeerDescriptorMap IsMember(types.NodeID) bool IsLearner(types.NodeID) bool Clear() }
type MetadataRepository ¶
type MetadataRepository interface { RegisterStorageNode(context.Context, *varlogpb.StorageNodeDescriptor) error UnregisterStorageNode(context.Context, types.StorageNodeID) error RegisterTopic(context.Context, types.TopicID) error UnregisterTopic(context.Context, types.TopicID) error RegisterLogStream(context.Context, *varlogpb.LogStreamDescriptor) error UnregisterLogStream(context.Context, types.LogStreamID) error UpdateLogStream(context.Context, *varlogpb.LogStreamDescriptor) error GetMetadata(context.Context) (*varlogpb.MetadataDescriptor, error) Seal(context.Context, types.LogStreamID) (types.GLSN, error) Unseal(context.Context, types.LogStreamID) error Close() error }
type MetadataRepositoryService ¶
type MetadataRepositoryService struct {
// contains filtered or unexported fields
}
func NewMetadataRepositoryService ¶
func NewMetadataRepositoryService(metaRepos MetadataRepository) *MetadataRepositoryService
func (*MetadataRepositoryService) GetMetadata ¶
func (s *MetadataRepositoryService) GetMetadata(ctx context.Context, req *mrpb.GetMetadataRequest) (*mrpb.GetMetadataResponse, error)
func (*MetadataRepositoryService) Register ¶
func (s *MetadataRepositoryService) Register(server *grpc.Server)
func (*MetadataRepositoryService) RegisterLogStream ¶
func (s *MetadataRepositoryService) RegisterLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)
func (*MetadataRepositoryService) RegisterStorageNode ¶
func (s *MetadataRepositoryService) RegisterStorageNode(ctx context.Context, req *mrpb.StorageNodeRequest) (*types.Empty, error)
func (*MetadataRepositoryService) RegisterTopic ¶
func (s *MetadataRepositoryService) RegisterTopic(ctx context.Context, req *mrpb.TopicRequest) (*types.Empty, error)
func (*MetadataRepositoryService) Seal ¶
func (s *MetadataRepositoryService) Seal(ctx context.Context, req *mrpb.SealRequest) (*mrpb.SealResponse, error)
func (*MetadataRepositoryService) UnregisterLogStream ¶
func (s *MetadataRepositoryService) UnregisterLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)
func (*MetadataRepositoryService) UnregisterStorageNode ¶
func (s *MetadataRepositoryService) UnregisterStorageNode(ctx context.Context, req *mrpb.StorageNodeRequest) (*types.Empty, error)
func (*MetadataRepositoryService) UnregisterTopic ¶
func (s *MetadataRepositoryService) UnregisterTopic(ctx context.Context, req *mrpb.TopicRequest) (*types.Empty, error)
func (*MetadataRepositoryService) Unseal ¶
func (s *MetadataRepositoryService) Unseal(ctx context.Context, req *mrpb.UnsealRequest) (*mrpb.UnsealResponse, error)
func (*MetadataRepositoryService) UpdateLogStream ¶
func (s *MetadataRepositoryService) UpdateLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)
type MetadataStorage ¶
type MetadataStorage struct {
// contains filtered or unexported fields
}
TODO:: refactoring
func NewMetadataStorage ¶
func (*MetadataStorage) AppendLogStreamCommitHistory ¶
func (ms *MetadataStorage) AppendLogStreamCommitHistory(cr *mrpb.LogStreamCommitResults)
func (*MetadataStorage) ApplySnapshot ¶
func (*MetadataStorage) Clear ¶
func (ms *MetadataStorage) Clear()
func (*MetadataStorage) Close ¶
func (ms *MetadataStorage) Close()
func (*MetadataStorage) GetFirstCommitResults ¶
func (ms *MetadataStorage) GetFirstCommitResults() *mrpb.LogStreamCommitResults
func (*MetadataStorage) GetLastCommitResults ¶
func (ms *MetadataStorage) GetLastCommitResults() *mrpb.LogStreamCommitResults
func (*MetadataStorage) GetLastCommitVersion ¶
func (ms *MetadataStorage) GetLastCommitVersion() types.Version
func (*MetadataStorage) GetLogStreamCommitResults ¶
func (ms *MetadataStorage) GetLogStreamCommitResults() []*mrpb.LogStreamCommitResults
func (*MetadataStorage) GetLogStreams ¶
func (ms *MetadataStorage) GetLogStreams() []*varlogpb.LogStreamDescriptor
func (*MetadataStorage) GetMetadata ¶
func (ms *MetadataStorage) GetMetadata() *varlogpb.MetadataDescriptor
func (*MetadataStorage) GetMinCommitVersion ¶
func (ms *MetadataStorage) GetMinCommitVersion() types.Version
func (*MetadataStorage) GetPeers ¶
func (ms *MetadataStorage) GetPeers() *mrpb.MetadataRepositoryDescriptor_PeerDescriptorMap
func (*MetadataStorage) GetSnapshot ¶
func (ms *MetadataStorage) GetSnapshot() ([]byte, *raftpb.ConfState, uint64)
func (*MetadataStorage) GetSnapshotIndex ¶
func (ms *MetadataStorage) GetSnapshotIndex() uint64
func (*MetadataStorage) GetSortedTopicLogStreamIDs ¶
func (ms *MetadataStorage) GetSortedTopicLogStreamIDs() []TopicLSID
func (*MetadataStorage) GetStorageNodes ¶
func (ms *MetadataStorage) GetStorageNodes() []*varlogpb.StorageNodeDescriptor
func (*MetadataStorage) Leader ¶
func (ms *MetadataStorage) Leader() types.NodeID
func (*MetadataStorage) LookupEndpoint ¶
func (ms *MetadataStorage) LookupEndpoint(nodeID types.NodeID) string
func (*MetadataStorage) LookupLogStream ¶
func (ms *MetadataStorage) LookupLogStream(lsID types.LogStreamID) *varlogpb.LogStreamDescriptor
func (*MetadataStorage) LookupNextCommitResults ¶
func (ms *MetadataStorage) LookupNextCommitResults(ver types.Version) (*mrpb.LogStreamCommitResults, error)
func (*MetadataStorage) LookupStorageNode ¶
func (ms *MetadataStorage) LookupStorageNode(snID types.StorageNodeID) *varlogpb.StorageNodeDescriptor
func (*MetadataStorage) LookupUncommitReport ¶
func (ms *MetadataStorage) LookupUncommitReport(lsID types.LogStreamID, snID types.StorageNodeID) (snpb.LogStreamUncommitReport, bool)
func (*MetadataStorage) LookupUncommitReports ¶
func (ms *MetadataStorage) LookupUncommitReports(lsID types.LogStreamID) *mrpb.LogStreamUncommitReports
func (*MetadataStorage) NumUpdateSinceCommit ¶
func (ms *MetadataStorage) NumUpdateSinceCommit() uint64
func (*MetadataStorage) RecoverStateMachine ¶
func (ms *MetadataStorage) RecoverStateMachine(stateMachine *mrpb.MetadataRepositoryDescriptor, appliedIndex, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) RegisterEndpoint ¶
func (*MetadataStorage) RegisterLogStream ¶
func (ms *MetadataStorage) RegisterLogStream(ls *varlogpb.LogStreamDescriptor, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) RegisterStorageNode ¶
func (ms *MetadataStorage) RegisterStorageNode(sn *varlogpb.StorageNodeDescriptor, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) RegisterTopic ¶
func (ms *MetadataStorage) RegisterTopic(topic *varlogpb.TopicDescriptor, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) RemovePeer ¶
func (*MetadataStorage) ResetUpdateSinceCommit ¶
func (ms *MetadataStorage) ResetUpdateSinceCommit()
func (*MetadataStorage) Run ¶
func (ms *MetadataStorage) Run()
func (*MetadataStorage) SealLogStream ¶
func (ms *MetadataStorage) SealLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) SealingLogStream ¶
func (ms *MetadataStorage) SealingLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) SetLeader ¶
func (ms *MetadataStorage) SetLeader(nodeID types.NodeID)
func (*MetadataStorage) TrimLogStreamCommitHistory ¶
func (ms *MetadataStorage) TrimLogStreamCommitHistory(ver types.Version) error
func (*MetadataStorage) UnregisterLogStream ¶
func (ms *MetadataStorage) UnregisterLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) UnregisterStorageNode ¶
func (ms *MetadataStorage) UnregisterStorageNode(snID types.StorageNodeID, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) UnregisterTopic ¶
func (ms *MetadataStorage) UnregisterTopic(topicID types.TopicID, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) UnsealLogStream ¶
func (ms *MetadataStorage) UnsealLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) UpdateAppliedIndex ¶
func (ms *MetadataStorage) UpdateAppliedIndex(appliedIndex uint64)
func (*MetadataStorage) UpdateLogStream ¶
func (ms *MetadataStorage) UpdateLogStream(ls *varlogpb.LogStreamDescriptor, nodeIndex, requestIndex uint64) error
func (*MetadataStorage) UpdateUncommitReport ¶
func (ms *MetadataStorage) UpdateUncommitReport(lsID types.LogStreamID, snID types.StorageNodeID, s snpb.LogStreamUncommitReport)
type Option ¶ added in v0.7.0
type Option interface {
// contains filtered or unexported methods
}
func JoinCluster ¶ added in v0.7.0
func JoinCluster() Option
func WithClusterID ¶ added in v0.7.0
func WithCommitTick ¶ added in v0.7.0
func WithDebugAddress ¶ added in v0.7.0
func WithLogger ¶ added in v0.7.0
func WithMaxLogStreamsCountPerTopic ¶ added in v0.10.0
func WithMaxSnapPurgeCount ¶ added in v0.7.0
func WithMaxTopicsCount ¶ added in v0.10.0
func WithMaxWALPurgeCount ¶ added in v0.7.0
func WithRPCAddress ¶ added in v0.7.0
func WithRPCTimeout ¶ added in v0.7.0
func WithRaftAddress ¶ added in v0.7.0
func WithRaftDirectory ¶ added in v0.7.0
func WithRaftTick ¶ added in v0.7.0
func WithReplicationFactor ¶ added in v0.7.0
func WithReportCommitterReadBufferSize ¶ added in v0.7.0
func WithReportCommitterWriteBufferSize ¶ added in v0.7.0
func WithReporterClientFactory ¶ added in v0.7.0
func WithReporterClientFactory(reporterClientFac ReporterClientFactory) Option
func WithSnapshotCatchUpCount ¶ added in v0.7.0
func WithSnapshotCount ¶ added in v0.7.0
func WithTelemetryCollectorEndpoint ¶ added in v0.7.0
func WithTelemetryCollectorName ¶ added in v0.7.0
type RaftLogger ¶
type RaftLogger struct {
*zap.SugaredLogger
}
func NewRaftLogger ¶
func NewRaftLogger(logger *zap.Logger) *RaftLogger
func (*RaftLogger) Warning ¶
func (l *RaftLogger) Warning(v ...interface{})
func (*RaftLogger) Warningf ¶
func (l *RaftLogger) Warningf(template string, args ...interface{})
type RaftMetadataRepository ¶
type RaftMetadataRepository struct {
// contains filtered or unexported fields
}
func NewRaftMetadataRepository ¶
func NewRaftMetadataRepository(opts ...Option) *RaftMetadataRepository
func (*RaftMetadataRepository) Close ¶
func (mr *RaftMetadataRepository) Close() error
TODO:: handle pendding msg
func (*RaftMetadataRepository) GetClusterInfo ¶
func (mr *RaftMetadataRepository) GetClusterInfo(context.Context, types.ClusterID) (*mrpb.ClusterInfo, error)
func (*RaftMetadataRepository) GetLastCommitResults ¶
func (mr *RaftMetadataRepository) GetLastCommitResults() *mrpb.LogStreamCommitResults
func (*RaftMetadataRepository) GetLastCommitVersion ¶
func (mr *RaftMetadataRepository) GetLastCommitVersion() types.Version
func (*RaftMetadataRepository) GetMetadata ¶
func (mr *RaftMetadataRepository) GetMetadata(context.Context) (*varlogpb.MetadataDescriptor, error)
func (*RaftMetadataRepository) GetOldestCommitVersion ¶
func (mr *RaftMetadataRepository) GetOldestCommitVersion() types.Version
func (*RaftMetadataRepository) GetReportCount ¶
func (mr *RaftMetadataRepository) GetReportCount() uint64
func (*RaftMetadataRepository) GetReporterClient ¶
func (mr *RaftMetadataRepository) GetReporterClient(ctx context.Context, sn *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)
func (*RaftMetadataRepository) GetServerAddr ¶
func (mr *RaftMetadataRepository) GetServerAddr() string
func (*RaftMetadataRepository) IsLearner ¶
func (mr *RaftMetadataRepository) IsLearner() bool
func (*RaftMetadataRepository) IsMember ¶
func (mr *RaftMetadataRepository) IsMember() bool
func (*RaftMetadataRepository) LookupNextCommitResults ¶
func (mr *RaftMetadataRepository) LookupNextCommitResults(ver types.Version) (*mrpb.LogStreamCommitResults, error)
func (*RaftMetadataRepository) ProposeReport ¶
func (mr *RaftMetadataRepository) ProposeReport(snID types.StorageNodeID, ur []snpb.LogStreamUncommitReport) error
func (*RaftMetadataRepository) RegisterLogStream ¶
func (mr *RaftMetadataRepository) RegisterLogStream(ctx context.Context, ls *varlogpb.LogStreamDescriptor) error
func (*RaftMetadataRepository) RegisterStorageNode ¶
func (mr *RaftMetadataRepository) RegisterStorageNode(ctx context.Context, sn *varlogpb.StorageNodeDescriptor) error
func (*RaftMetadataRepository) RegisterTopic ¶
func (*RaftMetadataRepository) RemovePeer ¶
func (*RaftMetadataRepository) Run ¶
func (mr *RaftMetadataRepository) Run()
func (*RaftMetadataRepository) Seal ¶
func (mr *RaftMetadataRepository) Seal(ctx context.Context, lsID types.LogStreamID) (types.GLSN, error)
func (*RaftMetadataRepository) UnregisterLogStream ¶
func (mr *RaftMetadataRepository) UnregisterLogStream(ctx context.Context, lsID types.LogStreamID) error
func (*RaftMetadataRepository) UnregisterStorageNode ¶
func (mr *RaftMetadataRepository) UnregisterStorageNode(ctx context.Context, snID types.StorageNodeID) error
func (*RaftMetadataRepository) UnregisterTopic ¶
func (*RaftMetadataRepository) Unseal ¶
func (mr *RaftMetadataRepository) Unseal(ctx context.Context, lsID types.LogStreamID) error
func (*RaftMetadataRepository) UpdateLogStream ¶
func (mr *RaftMetadataRepository) UpdateLogStream(ctx context.Context, ls *varlogpb.LogStreamDescriptor) error
func (*RaftMetadataRepository) Wait ¶
func (mr *RaftMetadataRepository) Wait()
type ReportCollector ¶
type ReportCollector interface { Run() error Reset() Close() Recover([]*varlogpb.StorageNodeDescriptor, []*varlogpb.LogStreamDescriptor, types.Version) error RegisterStorageNode(*varlogpb.StorageNodeDescriptor) error UnregisterStorageNode(types.StorageNodeID) error RegisterLogStream(types.TopicID, types.StorageNodeID, types.LogStreamID, types.Version, varlogpb.LogStreamStatus) error UnregisterLogStream(types.StorageNodeID, types.LogStreamID) error Commit() Seal(types.LogStreamID) Unseal(types.LogStreamID, types.Version) NumExecutors() int NumCommitter() int }
type ReportCollectorHelper ¶
type ReportCollectorHelper interface { ProposeReport(types.StorageNodeID, []snpb.LogStreamUncommitReport) error GetReporterClient(context.Context, *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error) GetLastCommitResults() *mrpb.LogStreamCommitResults LookupNextCommitResults(types.Version) (*mrpb.LogStreamCommitResults, error) }
type ReporterClientFactory ¶
type ReporterClientFactory interface {
GetReporterClient(context.Context, *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)
}
type SnapshotGetter ¶
Source Files ¶
- config.go
- dummy_storagenode_client_factory_impl.go
- management.go
- management_service.go
- metadata_repository.go
- metadata_repository_metrics.go
- metadata_repository_service.go
- raft.go
- raft_logger.go
- raft_metadata_repository.go
- report_collector.go
- reporter_client_factory.go
- reporter_client_factory_impl.go
- storage.go
- telemetry.go
Click to show internal directories.
Click to hide internal directories.