metarepos

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2023 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultClusterID                        = types.ClusterID(1)
	DefaultRPCBindAddress                   = "0.0.0.0:9092"
	DefaultDebugAddress                     = "0.0.0.0:9099"
	DefaultRaftPort                         = 10000
	DefaultSnapshotCount             uint64 = 10000
	DefaultSnapshotCatchUpCount      uint64 = 10000
	DefaultSnapshotPurgeCount        uint   = 10
	DefaultWalPurgeCount             uint   = 10
	DefaultLogReplicationFactor      int    = 1
	DefaultProposeTimeout                   = 100 * time.Millisecond
	DefaultRaftTick                         = 100 * time.Millisecond
	DefaultRPCTimeout                       = 100 * time.Millisecond
	DefaultCommitTick                       = 1 * time.Millisecond
	DefaultPromoteTick                      = 100 * time.Millisecond
	DefaultRaftDir                   string = "raftdata"
	DefaultLogDir                    string = "log"
	DefaultTelemetryCollectorName    string = "nop"
	DefaultTelmetryCollectorEndpoint string = "localhost:55680"

	DefaultReportCommitterReadBufferSize  = 32 * 1024 // 32KB
	DefaultReportCommitterWriteBufferSize = 32 * 1024 // 32KB

	DefaultMaxTopicsCount             = -1
	DefaultMaxLogStreamsCountPerTopic = -1

	UnusedRequestIndex uint64 = 0
)
View Source
const DefaultCatchupRefreshTime = 3 * time.Millisecond
View Source
const DefaultDelay = 500 * time.Microsecond
View Source
const DefaultReportRefreshTime = time.Second
View Source
const DefaultSampleReportsRate = 1000
View Source
const (
	PromoteRate = 0.9
)

Variables

View Source
var (
	DefaultRaftAddress = makeDefaultRaftAddress()
)

Functions

func NewReportCollector

func NewReportCollector(helper ReportCollectorHelper, rpcTimeout time.Duration, tmStub *telemetryStub, logger *zap.Logger) *reportCollector

func NewReporterClientFactory

func NewReporterClientFactory(grpcDialOptions ...grpc.DialOption) *reporterClientFactory

Types

type DummyStorageNodeClient

type DummyStorageNodeClient struct {
	// contains filtered or unexported fields
}

func (*DummyStorageNodeClient) Close

func (r *DummyStorageNodeClient) Close() error

func (*DummyStorageNodeClient) Commit

func (*DummyStorageNodeClient) CommitBatch added in v0.7.0

func (*DummyStorageNodeClient) DisableCommit added in v0.10.0

func (r *DummyStorageNodeClient) DisableCommit()

func (*DummyStorageNodeClient) DisableReport

func (r *DummyStorageNodeClient) DisableReport()

func (*DummyStorageNodeClient) EnableCommmit added in v0.10.0

func (r *DummyStorageNodeClient) EnableCommmit()

func (*DummyStorageNodeClient) EnableReport

func (r *DummyStorageNodeClient) EnableReport()

func (*DummyStorageNodeClient) GetMetadata

func (*DummyStorageNodeClient) GetReport

func (*DummyStorageNodeClient) RemoveLogStream

func (*DummyStorageNodeClient) SetCommitDelay

func (r *DummyStorageNodeClient) SetCommitDelay(d time.Duration)

func (*DummyStorageNodeClient) SetReportDelay

func (r *DummyStorageNodeClient) SetReportDelay(d time.Duration)

func (*DummyStorageNodeClient) Target

func (*DummyStorageNodeClient) Trim

type DummyStorageNodeClientFactory

type DummyStorageNodeClientFactory struct {
	// contains filtered or unexported fields
}

func NewDummyStorageNodeClientFactory

func NewDummyStorageNodeClientFactory(nrLogStreams int, manual bool) *DummyStorageNodeClientFactory

func (*DummyStorageNodeClientFactory) GetManagementClient

func (*DummyStorageNodeClientFactory) GetReporterClient

type DummyStorageNodeClientStatus

type DummyStorageNodeClientStatus int32
const (
	DummyStorageNodeClientStatusRunning DummyStorageNodeClientStatus = iota
	DummyStorageNodeClientStatusClosed
	DummyStorageNodeClientStatusCrash
)

type EmptyStorageNodeClient

type EmptyStorageNodeClient struct {
}

func (*EmptyStorageNodeClient) Close

func (rc *EmptyStorageNodeClient) Close() error

func (*EmptyStorageNodeClient) Commit

func (*EmptyStorageNodeClient) CommitBatch added in v0.7.0

func (*EmptyStorageNodeClient) GetMetadata

func (*EmptyStorageNodeClient) GetReport

func (rc *EmptyStorageNodeClient) GetReport() (*snpb.GetReportResponse, error)

func (*EmptyStorageNodeClient) RemoveLogStream

func (*EmptyStorageNodeClient) Target

func (*EmptyStorageNodeClient) Trim

func (*EmptyStorageNodeClient) Unseal

type EmptyStorageNodeClientFactory

type EmptyStorageNodeClientFactory struct {
}

func NewEmptyStorageNodeClientFactory

func NewEmptyStorageNodeClientFactory() *EmptyStorageNodeClientFactory

func (*EmptyStorageNodeClientFactory) GetManagementClient

func (*EmptyStorageNodeClientFactory) GetReporterClient

type Management

type Management interface {
	AddPeer(context.Context, types.ClusterID, types.NodeID, string) error
	RemovePeer(context.Context, types.ClusterID, types.NodeID) error
	GetClusterInfo(context.Context, types.ClusterID) (*mrpb.ClusterInfo, error)
}

type ManagementService

type ManagementService struct {
	// contains filtered or unexported fields
}

func NewManagementService

func NewManagementService(m Management) *ManagementService

func (*ManagementService) AddPeer

func (*ManagementService) GetClusterInfo

func (*ManagementService) Register

func (s *ManagementService) Register(server *grpc.Server)

func (*ManagementService) RemovePeer

func (s *ManagementService) RemovePeer(ctx context.Context, req *mrpb.RemovePeerRequest) (*types.Empty, error)

type Membership

type Membership interface {
	SetLeader(types.NodeID)

	Leader() types.NodeID

	AddPeer(types.NodeID, string, bool, *raftpb.ConfState, uint64) error

	RemovePeer(types.NodeID, *raftpb.ConfState, uint64) error

	GetPeers() *mrpb.MetadataRepositoryDescriptor_PeerDescriptorMap

	IsMember(types.NodeID) bool

	IsLearner(types.NodeID) bool

	Clear()
}

type MetadataRepository

type MetadataRepository interface {
	RegisterStorageNode(context.Context, *varlogpb.StorageNodeDescriptor) error
	UnregisterStorageNode(context.Context, types.StorageNodeID) error
	RegisterTopic(context.Context, types.TopicID) error
	UnregisterTopic(context.Context, types.TopicID) error
	RegisterLogStream(context.Context, *varlogpb.LogStreamDescriptor) error
	UnregisterLogStream(context.Context, types.LogStreamID) error
	UpdateLogStream(context.Context, *varlogpb.LogStreamDescriptor) error
	GetMetadata(context.Context) (*varlogpb.MetadataDescriptor, error)
	Seal(context.Context, types.LogStreamID) (types.GLSN, error)
	Unseal(context.Context, types.LogStreamID) error
	Close() error
}

type MetadataRepositoryService

type MetadataRepositoryService struct {
	// contains filtered or unexported fields
}

func NewMetadataRepositoryService

func NewMetadataRepositoryService(metaRepos MetadataRepository) *MetadataRepositoryService

func (*MetadataRepositoryService) GetMetadata

func (*MetadataRepositoryService) Register

func (s *MetadataRepositoryService) Register(server *grpc.Server)

func (*MetadataRepositoryService) RegisterLogStream

func (s *MetadataRepositoryService) RegisterLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)

func (*MetadataRepositoryService) RegisterStorageNode

func (s *MetadataRepositoryService) RegisterStorageNode(ctx context.Context, req *mrpb.StorageNodeRequest) (*types.Empty, error)

func (*MetadataRepositoryService) RegisterTopic

func (s *MetadataRepositoryService) RegisterTopic(ctx context.Context, req *mrpb.TopicRequest) (*types.Empty, error)

func (*MetadataRepositoryService) Seal

func (*MetadataRepositoryService) UnregisterLogStream

func (s *MetadataRepositoryService) UnregisterLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)

func (*MetadataRepositoryService) UnregisterStorageNode

func (s *MetadataRepositoryService) UnregisterStorageNode(ctx context.Context, req *mrpb.StorageNodeRequest) (*types.Empty, error)

func (*MetadataRepositoryService) UnregisterTopic

func (s *MetadataRepositoryService) UnregisterTopic(ctx context.Context, req *mrpb.TopicRequest) (*types.Empty, error)

func (*MetadataRepositoryService) Unseal

func (*MetadataRepositoryService) UpdateLogStream

func (s *MetadataRepositoryService) UpdateLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)

type MetadataStorage

type MetadataStorage struct {
	// contains filtered or unexported fields
}

TODO:: refactoring

func NewMetadataStorage

func NewMetadataStorage(cb func(uint64, uint64, error), snapCount uint64, logger *zap.Logger) *MetadataStorage

func (*MetadataStorage) AddPeer

func (ms *MetadataStorage) AddPeer(nodeID types.NodeID, url string, isLearner bool, cs *raftpb.ConfState, appliedIndex uint64) error

func (*MetadataStorage) AppendLogStreamCommitHistory

func (ms *MetadataStorage) AppendLogStreamCommitHistory(cr *mrpb.LogStreamCommitResults)

func (*MetadataStorage) ApplySnapshot

func (ms *MetadataStorage) ApplySnapshot(snap []byte, snapConfState *raftpb.ConfState, snapIndex uint64) error

func (*MetadataStorage) Clear

func (ms *MetadataStorage) Clear()

func (*MetadataStorage) Close

func (ms *MetadataStorage) Close()

func (*MetadataStorage) GetFirstCommitResults

func (ms *MetadataStorage) GetFirstCommitResults() *mrpb.LogStreamCommitResults

func (*MetadataStorage) GetLastCommitResults

func (ms *MetadataStorage) GetLastCommitResults() *mrpb.LogStreamCommitResults

func (*MetadataStorage) GetLastCommitVersion

func (ms *MetadataStorage) GetLastCommitVersion() types.Version

func (*MetadataStorage) GetLogStreamCommitResults

func (ms *MetadataStorage) GetLogStreamCommitResults() []*mrpb.LogStreamCommitResults

func (*MetadataStorage) GetLogStreams

func (ms *MetadataStorage) GetLogStreams() []*varlogpb.LogStreamDescriptor

func (*MetadataStorage) GetMetadata

func (ms *MetadataStorage) GetMetadata() *varlogpb.MetadataDescriptor

func (*MetadataStorage) GetMinCommitVersion

func (ms *MetadataStorage) GetMinCommitVersion() types.Version

func (*MetadataStorage) GetPeers

func (*MetadataStorage) GetSnapshot

func (ms *MetadataStorage) GetSnapshot() ([]byte, *raftpb.ConfState, uint64)

func (*MetadataStorage) GetSnapshotIndex

func (ms *MetadataStorage) GetSnapshotIndex() uint64

func (*MetadataStorage) GetSortedTopicLogStreamIDs

func (ms *MetadataStorage) GetSortedTopicLogStreamIDs() []TopicLSID

func (*MetadataStorage) GetStorageNodes

func (ms *MetadataStorage) GetStorageNodes() []*varlogpb.StorageNodeDescriptor

func (*MetadataStorage) IsLearner

func (ms *MetadataStorage) IsLearner(nodeID types.NodeID) bool

func (*MetadataStorage) IsMember

func (ms *MetadataStorage) IsMember(nodeID types.NodeID) bool

func (*MetadataStorage) Leader

func (ms *MetadataStorage) Leader() types.NodeID

func (*MetadataStorage) LookupEndpoint

func (ms *MetadataStorage) LookupEndpoint(nodeID types.NodeID) string

func (*MetadataStorage) LookupLogStream

func (ms *MetadataStorage) LookupLogStream(lsID types.LogStreamID) *varlogpb.LogStreamDescriptor

func (*MetadataStorage) LookupNextCommitResults

func (ms *MetadataStorage) LookupNextCommitResults(ver types.Version) (*mrpb.LogStreamCommitResults, error)

func (*MetadataStorage) LookupStorageNode

func (ms *MetadataStorage) LookupStorageNode(snID types.StorageNodeID) *varlogpb.StorageNodeDescriptor

func (*MetadataStorage) LookupUncommitReport

func (ms *MetadataStorage) LookupUncommitReport(lsID types.LogStreamID, snID types.StorageNodeID) (snpb.LogStreamUncommitReport, bool)

func (*MetadataStorage) LookupUncommitReports

func (ms *MetadataStorage) LookupUncommitReports(lsID types.LogStreamID) *mrpb.LogStreamUncommitReports

func (*MetadataStorage) NumUpdateSinceCommit

func (ms *MetadataStorage) NumUpdateSinceCommit() uint64

func (*MetadataStorage) RecoverStateMachine

func (ms *MetadataStorage) RecoverStateMachine(stateMachine *mrpb.MetadataRepositoryDescriptor, appliedIndex, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RegisterEndpoint

func (ms *MetadataStorage) RegisterEndpoint(nodeID types.NodeID, url string, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RegisterLogStream

func (ms *MetadataStorage) RegisterLogStream(ls *varlogpb.LogStreamDescriptor, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RegisterStorageNode

func (ms *MetadataStorage) RegisterStorageNode(sn *varlogpb.StorageNodeDescriptor, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RegisterTopic

func (ms *MetadataStorage) RegisterTopic(topic *varlogpb.TopicDescriptor, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RemovePeer

func (ms *MetadataStorage) RemovePeer(nodeID types.NodeID, cs *raftpb.ConfState, appliedIndex uint64) error

func (*MetadataStorage) ResetUpdateSinceCommit

func (ms *MetadataStorage) ResetUpdateSinceCommit()

func (*MetadataStorage) Run

func (ms *MetadataStorage) Run()

func (*MetadataStorage) SealLogStream

func (ms *MetadataStorage) SealLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) SealingLogStream

func (ms *MetadataStorage) SealingLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) SetLeader

func (ms *MetadataStorage) SetLeader(nodeID types.NodeID)

func (*MetadataStorage) TrimLogStreamCommitHistory

func (ms *MetadataStorage) TrimLogStreamCommitHistory(ver types.Version) error

func (*MetadataStorage) UnregisterLogStream

func (ms *MetadataStorage) UnregisterLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UnregisterStorageNode

func (ms *MetadataStorage) UnregisterStorageNode(snID types.StorageNodeID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UnregisterTopic

func (ms *MetadataStorage) UnregisterTopic(topicID types.TopicID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UnsealLogStream

func (ms *MetadataStorage) UnsealLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UpdateAppliedIndex

func (ms *MetadataStorage) UpdateAppliedIndex(appliedIndex uint64)

func (*MetadataStorage) UpdateLogStream

func (ms *MetadataStorage) UpdateLogStream(ls *varlogpb.LogStreamDescriptor, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UpdateUncommitReport

func (ms *MetadataStorage) UpdateUncommitReport(lsID types.LogStreamID, snID types.StorageNodeID, s snpb.LogStreamUncommitReport)

type Option added in v0.7.0

type Option interface {
	// contains filtered or unexported methods
}

func JoinCluster added in v0.7.0

func JoinCluster() Option

func WithClusterID added in v0.7.0

func WithClusterID(cid types.ClusterID) Option

func WithCommitTick added in v0.7.0

func WithCommitTick(commitTick time.Duration) Option

func WithDebugAddress added in v0.7.0

func WithDebugAddress(debugAddr string) Option

func WithLogger added in v0.7.0

func WithLogger(logger *zap.Logger) Option

func WithMaxLogStreamsCountPerTopic added in v0.10.0

func WithMaxLogStreamsCountPerTopic(maxLogStreamsCountPerTopic int32) Option

func WithMaxSnapPurgeCount added in v0.7.0

func WithMaxSnapPurgeCount(maxSnapPurgeCount uint) Option

func WithMaxTopicsCount added in v0.10.0

func WithMaxTopicsCount(maxTopicsCount int32) Option

func WithMaxWALPurgeCount added in v0.7.0

func WithMaxWALPurgeCount(maxWalPurgeCount uint) Option

func WithPeers added in v0.7.0

func WithPeers(peers ...string) Option

func WithRPCAddress added in v0.7.0

func WithRPCAddress(rpcAddr string) Option

func WithRPCTimeout added in v0.7.0

func WithRPCTimeout(rpcTimeout time.Duration) Option

func WithRaftAddress added in v0.7.0

func WithRaftAddress(raftAddr string) Option

func WithRaftDirectory added in v0.7.0

func WithRaftDirectory(raftDir string) Option

func WithRaftTick added in v0.7.0

func WithRaftTick(raftTick time.Duration) Option

func WithReplicationFactor added in v0.7.0

func WithReplicationFactor(replicationFactor int) Option

func WithReportCommitterReadBufferSize added in v0.7.0

func WithReportCommitterReadBufferSize(readBufferSize int) Option

func WithReportCommitterWriteBufferSize added in v0.7.0

func WithReportCommitterWriteBufferSize(writeBufferSize int) Option

func WithReporterClientFactory added in v0.7.0

func WithReporterClientFactory(reporterClientFac ReporterClientFactory) Option

func WithSnapshotCatchUpCount added in v0.7.0

func WithSnapshotCatchUpCount(snapCatchUpCount uint64) Option

func WithSnapshotCount added in v0.7.0

func WithSnapshotCount(snapshotCount uint64) Option

func WithTelemetryCollectorEndpoint added in v0.7.0

func WithTelemetryCollectorEndpoint(telemetryCollectorEndpoint string) Option

func WithTelemetryCollectorName added in v0.7.0

func WithTelemetryCollectorName(telemetryCollectorName string) Option

type RaftLogger

type RaftLogger struct {
	*zap.SugaredLogger
}

func NewRaftLogger

func NewRaftLogger(logger *zap.Logger) *RaftLogger

func (*RaftLogger) Warning

func (l *RaftLogger) Warning(v ...interface{})

func (*RaftLogger) Warningf

func (l *RaftLogger) Warningf(template string, args ...interface{})

type RaftMetadataRepository

type RaftMetadataRepository struct {
	// contains filtered or unexported fields
}

func NewRaftMetadataRepository

func NewRaftMetadataRepository(opts ...Option) *RaftMetadataRepository

func (*RaftMetadataRepository) AddPeer

func (mr *RaftMetadataRepository) AddPeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID, url string) error

func (*RaftMetadataRepository) Close

func (mr *RaftMetadataRepository) Close() error

TODO:: handle pendding msg

func (*RaftMetadataRepository) GetClusterInfo

func (*RaftMetadataRepository) GetLastCommitResults

func (mr *RaftMetadataRepository) GetLastCommitResults() *mrpb.LogStreamCommitResults

func (*RaftMetadataRepository) GetLastCommitVersion

func (mr *RaftMetadataRepository) GetLastCommitVersion() types.Version

func (*RaftMetadataRepository) GetMetadata

func (*RaftMetadataRepository) GetOldestCommitVersion

func (mr *RaftMetadataRepository) GetOldestCommitVersion() types.Version

func (*RaftMetadataRepository) GetReportCount

func (mr *RaftMetadataRepository) GetReportCount() uint64

func (*RaftMetadataRepository) GetReporterClient

func (*RaftMetadataRepository) GetServerAddr

func (mr *RaftMetadataRepository) GetServerAddr() string

func (*RaftMetadataRepository) IsLearner

func (mr *RaftMetadataRepository) IsLearner() bool

func (*RaftMetadataRepository) IsMember

func (mr *RaftMetadataRepository) IsMember() bool

func (*RaftMetadataRepository) LookupNextCommitResults

func (mr *RaftMetadataRepository) LookupNextCommitResults(ver types.Version) (*mrpb.LogStreamCommitResults, error)

func (*RaftMetadataRepository) ProposeReport

func (*RaftMetadataRepository) RegisterLogStream

func (mr *RaftMetadataRepository) RegisterLogStream(ctx context.Context, ls *varlogpb.LogStreamDescriptor) error

func (*RaftMetadataRepository) RegisterStorageNode

func (mr *RaftMetadataRepository) RegisterStorageNode(ctx context.Context, sn *varlogpb.StorageNodeDescriptor) error

func (*RaftMetadataRepository) RegisterTopic

func (mr *RaftMetadataRepository) RegisterTopic(ctx context.Context, topicID types.TopicID) error

func (*RaftMetadataRepository) RemovePeer

func (mr *RaftMetadataRepository) RemovePeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID) error

func (*RaftMetadataRepository) Run

func (mr *RaftMetadataRepository) Run()

func (*RaftMetadataRepository) Seal

func (*RaftMetadataRepository) UnregisterLogStream

func (mr *RaftMetadataRepository) UnregisterLogStream(ctx context.Context, lsID types.LogStreamID) error

func (*RaftMetadataRepository) UnregisterStorageNode

func (mr *RaftMetadataRepository) UnregisterStorageNode(ctx context.Context, snID types.StorageNodeID) error

func (*RaftMetadataRepository) UnregisterTopic

func (mr *RaftMetadataRepository) UnregisterTopic(ctx context.Context, topicID types.TopicID) error

func (*RaftMetadataRepository) Unseal

func (*RaftMetadataRepository) UpdateLogStream

func (*RaftMetadataRepository) Wait

func (mr *RaftMetadataRepository) Wait()

type ReportCollector

type ReportCollector interface {
	Run() error

	Reset()

	Close()

	Recover([]*varlogpb.StorageNodeDescriptor, []*varlogpb.LogStreamDescriptor, types.Version) error

	RegisterStorageNode(*varlogpb.StorageNodeDescriptor) error

	UnregisterStorageNode(types.StorageNodeID) error

	RegisterLogStream(types.TopicID, types.StorageNodeID, types.LogStreamID, types.Version, varlogpb.LogStreamStatus) error

	UnregisterLogStream(types.StorageNodeID, types.LogStreamID) error

	Commit()

	Seal(types.LogStreamID)

	Unseal(types.LogStreamID, types.Version)

	NumExecutors() int

	NumCommitter() int
}

type ReportCollectorHelper

type ReportCollectorHelper interface {
	ProposeReport(types.StorageNodeID, []snpb.LogStreamUncommitReport) error

	GetReporterClient(context.Context, *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)

	GetLastCommitResults() *mrpb.LogStreamCommitResults

	LookupNextCommitResults(types.Version) (*mrpb.LogStreamCommitResults, error)
}

type ReporterClientFactory

type ReporterClientFactory interface {
	GetReporterClient(context.Context, *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)
}

type SnapshotGetter

type SnapshotGetter interface {
	GetSnapshotIndex() uint64

	GetSnapshot() ([]byte, *raftpb.ConfState, uint64)
}

type TopicLSID

type TopicLSID struct {
	TopicID     types.TopicID
	LogStreamID types.LogStreamID
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL