metarepos

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2022 License: Apache-2.0 Imports: 53 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultRPCBindAddress                   = "0.0.0.0:9092"
	DefaultDebugAddress                     = "0.0.0.0:9099"
	DefaultRaftPort                         = 10000
	DefaultSnapshotCount             uint64 = 10000
	DefaultSnapshotCatchUpCount      uint64 = 10000
	DefaultSnapshotPurgeCount        uint   = 10
	DefaultWalPurgeCount             uint   = 10
	DefaultLogReplicationFactor      int    = 1
	DefaultProposeTimeout                   = 100 * time.Millisecond
	DefaultRaftTick                         = 100 * time.Millisecond
	DefaultRPCTimeout                       = 100 * time.Millisecond
	DefaultCommitTick                       = 1 * time.Millisecond
	DefaultPromoteTick                      = 100 * time.Millisecond
	DefaultRaftDir                   string = "raftdata"
	DefaultLogDir                    string = "log"
	DefaultTelemetryCollectorName    string = "nop"
	DefaultTelmetryCollectorEndpoint string = "localhost:55680"

	DefaultReportCommitterReadBufferSize  = 32 * 1024 // 32KB
	DefaultReportCommitterWriteBufferSize = 32 * 1024 // 32KB

	UnusedRequestIndex uint64 = 0
)
View Source
const DefaultCatchupRefreshTime = 3 * time.Millisecond
View Source
const DefaultDelay = 500 * time.Microsecond
View Source
const DefaultReportRefreshTime = time.Second
View Source
const DefaultSampleReportsRate = 1000
View Source
const (
	PromoteRate = 0.9
)

Variables

View Source
var (
	DefaultRaftAddress = makeDefaultRaftAddress()
)

Functions

func NewReportCollector

func NewReportCollector(helper ReportCollectorHelper, rpcTimeout time.Duration, tmStub *telemetryStub, logger *zap.Logger) *reportCollector

func NewReporterClientFactory

func NewReporterClientFactory(grpcDialOptions ...grpc.DialOption) *reporterClientFactory

Types

type DummyStorageNodeClient

type DummyStorageNodeClient struct {
	// contains filtered or unexported fields
}

func (*DummyStorageNodeClient) AddLogStreamReplica

func (*DummyStorageNodeClient) Close

func (r *DummyStorageNodeClient) Close() error

func (*DummyStorageNodeClient) Commit

func (*DummyStorageNodeClient) DisableReport

func (r *DummyStorageNodeClient) DisableReport()

func (*DummyStorageNodeClient) EnableReport

func (r *DummyStorageNodeClient) EnableReport()

func (*DummyStorageNodeClient) GetMetadata

func (*DummyStorageNodeClient) GetReport

func (*DummyStorageNodeClient) RemoveLogStream

func (*DummyStorageNodeClient) SetCommitDelay

func (r *DummyStorageNodeClient) SetCommitDelay(d time.Duration)

func (*DummyStorageNodeClient) SetReportDelay

func (r *DummyStorageNodeClient) SetReportDelay(d time.Duration)

func (*DummyStorageNodeClient) Target

func (*DummyStorageNodeClient) Trim

type DummyStorageNodeClientFactory

type DummyStorageNodeClientFactory struct {
	// contains filtered or unexported fields
}

func NewDummyStorageNodeClientFactory

func NewDummyStorageNodeClientFactory(nrLogStreams int, manual bool) *DummyStorageNodeClientFactory

func (*DummyStorageNodeClientFactory) GetManagementClient

func (*DummyStorageNodeClientFactory) GetReporterClient

type DummyStorageNodeClientStatus

type DummyStorageNodeClientStatus int32
const (
	DummyStorageNodeClientStatusRunning DummyStorageNodeClientStatus = iota
	DummyStorageNodeClientStatusClosed
	DummyStorageNodeClientStatusCrash
)

type EmptyStorageNodeClient

type EmptyStorageNodeClient struct {
}

func (*EmptyStorageNodeClient) AddLogStreamReplica

func (*EmptyStorageNodeClient) Close

func (rc *EmptyStorageNodeClient) Close() error

func (*EmptyStorageNodeClient) Commit

func (*EmptyStorageNodeClient) GetMetadata

func (*EmptyStorageNodeClient) GetReport

func (rc *EmptyStorageNodeClient) GetReport() (*snpb.GetReportResponse, error)

func (*EmptyStorageNodeClient) RemoveLogStream

func (*EmptyStorageNodeClient) Target

func (*EmptyStorageNodeClient) Trim

func (*EmptyStorageNodeClient) Unseal

type EmptyStorageNodeClientFactory

type EmptyStorageNodeClientFactory struct {
}

func NewEmptyStorageNodeClientFactory

func NewEmptyStorageNodeClientFactory() *EmptyStorageNodeClientFactory

func (*EmptyStorageNodeClientFactory) GetManagementClient

func (*EmptyStorageNodeClientFactory) GetReporterClient

type Management

type Management interface {
	AddPeer(context.Context, types.ClusterID, types.NodeID, string) error
	RemovePeer(context.Context, types.ClusterID, types.NodeID) error
	GetClusterInfo(context.Context, types.ClusterID) (*mrpb.ClusterInfo, error)
}

type ManagementService

type ManagementService struct {
	// contains filtered or unexported fields
}

func NewManagementService

func NewManagementService(m Management) *ManagementService

func (*ManagementService) AddPeer

func (*ManagementService) GetClusterInfo

func (*ManagementService) Register

func (s *ManagementService) Register(server *grpc.Server)

func (*ManagementService) RemovePeer

func (s *ManagementService) RemovePeer(ctx context.Context, req *mrpb.RemovePeerRequest) (*types.Empty, error)

type Membership

type Membership interface {
	SetLeader(types.NodeID)

	Leader() types.NodeID

	AddPeer(types.NodeID, string, bool, *raftpb.ConfState, uint64) error

	RemovePeer(types.NodeID, *raftpb.ConfState, uint64) error

	GetPeers() *mrpb.MetadataRepositoryDescriptor_PeerDescriptorMap

	IsMember(types.NodeID) bool

	IsLearner(types.NodeID) bool

	Clear()
}

type MetadataRepository

type MetadataRepository interface {
	RegisterStorageNode(context.Context, *varlogpb.StorageNodeDescriptor) error
	UnregisterStorageNode(context.Context, types.StorageNodeID) error
	RegisterTopic(context.Context, types.TopicID) error
	UnregisterTopic(context.Context, types.TopicID) error
	RegisterLogStream(context.Context, *varlogpb.LogStreamDescriptor) error
	UnregisterLogStream(context.Context, types.LogStreamID) error
	UpdateLogStream(context.Context, *varlogpb.LogStreamDescriptor) error
	GetMetadata(context.Context) (*varlogpb.MetadataDescriptor, error)
	Seal(context.Context, types.LogStreamID) (types.GLSN, error)
	Unseal(context.Context, types.LogStreamID) error
	Close() error
}

type MetadataRepositoryOptions

type MetadataRepositoryOptions struct {
	TelemetryOptions
	RaftOptions

	RPCBindAddress                 string
	RaftAddress                    string
	DebugAddress                   string
	ClusterID                      types.ClusterID
	Verbose                        bool
	NumRep                         int
	RaftProposeTimeout             time.Duration
	RPCTimeout                     time.Duration
	CommitTick                     time.Duration
	PromoteTick                    time.Duration
	ReporterClientFac              ReporterClientFactory
	LogDir                         string
	Logger                         *zap.Logger
	ReportCommitterReadBufferSize  int
	ReportCommitterWriteBufferSize int
}

type MetadataRepositoryService

type MetadataRepositoryService struct {
	// contains filtered or unexported fields
}

func NewMetadataRepositoryService

func NewMetadataRepositoryService(metaRepos MetadataRepository) *MetadataRepositoryService

func (*MetadataRepositoryService) GetMetadata

func (*MetadataRepositoryService) Register

func (s *MetadataRepositoryService) Register(server *grpc.Server)

func (*MetadataRepositoryService) RegisterLogStream

func (s *MetadataRepositoryService) RegisterLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)

func (*MetadataRepositoryService) RegisterStorageNode

func (s *MetadataRepositoryService) RegisterStorageNode(ctx context.Context, req *mrpb.StorageNodeRequest) (*types.Empty, error)

func (*MetadataRepositoryService) RegisterTopic

func (s *MetadataRepositoryService) RegisterTopic(ctx context.Context, req *mrpb.TopicRequest) (*types.Empty, error)

func (*MetadataRepositoryService) Seal

func (*MetadataRepositoryService) UnregisterLogStream

func (s *MetadataRepositoryService) UnregisterLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)

func (*MetadataRepositoryService) UnregisterStorageNode

func (s *MetadataRepositoryService) UnregisterStorageNode(ctx context.Context, req *mrpb.StorageNodeRequest) (*types.Empty, error)

func (*MetadataRepositoryService) UnregisterTopic

func (s *MetadataRepositoryService) UnregisterTopic(ctx context.Context, req *mrpb.TopicRequest) (*types.Empty, error)

func (*MetadataRepositoryService) Unseal

func (*MetadataRepositoryService) UpdateLogStream

func (s *MetadataRepositoryService) UpdateLogStream(ctx context.Context, req *mrpb.LogStreamRequest) (*types.Empty, error)

type MetadataStorage

type MetadataStorage struct {
	// contains filtered or unexported fields
}

TODO:: refactoring

func NewMetadataStorage

func NewMetadataStorage(cb func(uint64, uint64, error), snapCount uint64, logger *zap.Logger) *MetadataStorage

func (*MetadataStorage) AddPeer

func (ms *MetadataStorage) AddPeer(nodeID types.NodeID, url string, isLearner bool, cs *raftpb.ConfState, appliedIndex uint64) error

func (*MetadataStorage) AppendLogStreamCommitHistory

func (ms *MetadataStorage) AppendLogStreamCommitHistory(cr *mrpb.LogStreamCommitResults)

func (*MetadataStorage) ApplySnapshot

func (ms *MetadataStorage) ApplySnapshot(snap []byte, snapConfState *raftpb.ConfState, snapIndex uint64) error

func (*MetadataStorage) Clear

func (ms *MetadataStorage) Clear()

func (*MetadataStorage) Close

func (ms *MetadataStorage) Close()

func (*MetadataStorage) GetFirstCommitResults

func (ms *MetadataStorage) GetFirstCommitResults() *mrpb.LogStreamCommitResults

func (*MetadataStorage) GetLastCommitResults

func (ms *MetadataStorage) GetLastCommitResults() *mrpb.LogStreamCommitResults

func (*MetadataStorage) GetLastCommitVersion

func (ms *MetadataStorage) GetLastCommitVersion() types.Version

func (*MetadataStorage) GetLogStreamCommitResults

func (ms *MetadataStorage) GetLogStreamCommitResults() []*mrpb.LogStreamCommitResults

func (*MetadataStorage) GetLogStreams

func (ms *MetadataStorage) GetLogStreams() []*varlogpb.LogStreamDescriptor

func (*MetadataStorage) GetMetadata

func (ms *MetadataStorage) GetMetadata() *varlogpb.MetadataDescriptor

func (*MetadataStorage) GetMinCommitVersion

func (ms *MetadataStorage) GetMinCommitVersion() types.Version

func (*MetadataStorage) GetPeers

func (*MetadataStorage) GetSnapshot

func (ms *MetadataStorage) GetSnapshot() ([]byte, *raftpb.ConfState, uint64)

func (*MetadataStorage) GetSnapshotIndex

func (ms *MetadataStorage) GetSnapshotIndex() uint64

func (*MetadataStorage) GetSortedTopicLogStreamIDs

func (ms *MetadataStorage) GetSortedTopicLogStreamIDs() []TopicLSID

func (*MetadataStorage) GetStorageNodes

func (ms *MetadataStorage) GetStorageNodes() []*varlogpb.StorageNodeDescriptor

func (*MetadataStorage) IsLearner

func (ms *MetadataStorage) IsLearner(nodeID types.NodeID) bool

func (*MetadataStorage) IsMember

func (ms *MetadataStorage) IsMember(nodeID types.NodeID) bool

func (*MetadataStorage) Leader

func (ms *MetadataStorage) Leader() types.NodeID

func (*MetadataStorage) LookupEndpoint

func (ms *MetadataStorage) LookupEndpoint(nodeID types.NodeID) string

func (*MetadataStorage) LookupLogStream

func (ms *MetadataStorage) LookupLogStream(lsID types.LogStreamID) *varlogpb.LogStreamDescriptor

func (*MetadataStorage) LookupNextCommitResults

func (ms *MetadataStorage) LookupNextCommitResults(ver types.Version) (*mrpb.LogStreamCommitResults, error)

func (*MetadataStorage) LookupStorageNode

func (ms *MetadataStorage) LookupStorageNode(snID types.StorageNodeID) *varlogpb.StorageNodeDescriptor

func (*MetadataStorage) LookupUncommitReport

func (ms *MetadataStorage) LookupUncommitReport(lsID types.LogStreamID, snID types.StorageNodeID) (snpb.LogStreamUncommitReport, bool)

func (*MetadataStorage) LookupUncommitReports

func (ms *MetadataStorage) LookupUncommitReports(lsID types.LogStreamID) *mrpb.LogStreamUncommitReports

func (*MetadataStorage) NumUpdateSinceCommit

func (ms *MetadataStorage) NumUpdateSinceCommit() uint64

func (*MetadataStorage) RecoverStateMachine

func (ms *MetadataStorage) RecoverStateMachine(stateMachine *mrpb.MetadataRepositoryDescriptor, appliedIndex, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RegisterEndpoint

func (ms *MetadataStorage) RegisterEndpoint(nodeID types.NodeID, url string, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RegisterLogStream

func (ms *MetadataStorage) RegisterLogStream(ls *varlogpb.LogStreamDescriptor, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RegisterStorageNode

func (ms *MetadataStorage) RegisterStorageNode(sn *varlogpb.StorageNodeDescriptor, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RegisterTopic

func (ms *MetadataStorage) RegisterTopic(topic *varlogpb.TopicDescriptor, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) RemovePeer

func (ms *MetadataStorage) RemovePeer(nodeID types.NodeID, cs *raftpb.ConfState, appliedIndex uint64) error

func (*MetadataStorage) ResetUpdateSinceCommit

func (ms *MetadataStorage) ResetUpdateSinceCommit()

func (*MetadataStorage) Run

func (ms *MetadataStorage) Run()

func (*MetadataStorage) SealLogStream

func (ms *MetadataStorage) SealLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) SealingLogStream

func (ms *MetadataStorage) SealingLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) SetLeader

func (ms *MetadataStorage) SetLeader(nodeID types.NodeID)

func (*MetadataStorage) TrimLogStreamCommitHistory

func (ms *MetadataStorage) TrimLogStreamCommitHistory(ver types.Version) error

func (*MetadataStorage) UnregisterLogStream

func (ms *MetadataStorage) UnregisterLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UnregisterStorageNode

func (ms *MetadataStorage) UnregisterStorageNode(snID types.StorageNodeID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UnregisterTopic

func (ms *MetadataStorage) UnregisterTopic(topicID types.TopicID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UnsealLogStream

func (ms *MetadataStorage) UnsealLogStream(lsID types.LogStreamID, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UpdateAppliedIndex

func (ms *MetadataStorage) UpdateAppliedIndex(appliedIndex uint64)

func (*MetadataStorage) UpdateLogStream

func (ms *MetadataStorage) UpdateLogStream(ls *varlogpb.LogStreamDescriptor, nodeIndex, requestIndex uint64) error

func (*MetadataStorage) UpdateUncommitReport

func (ms *MetadataStorage) UpdateUncommitReport(lsID types.LogStreamID, snID types.StorageNodeID, s snpb.LogStreamUncommitReport)

type RaftLogger

type RaftLogger struct {
	*zap.SugaredLogger
}

func NewRaftLogger

func NewRaftLogger(logger *zap.Logger) *RaftLogger

func (*RaftLogger) Warning

func (l *RaftLogger) Warning(v ...interface{})

func (*RaftLogger) Warningf

func (l *RaftLogger) Warningf(template string, args ...interface{})

type RaftMetadataRepository

type RaftMetadataRepository struct {
	// contains filtered or unexported fields
}

func NewRaftMetadataRepository

func NewRaftMetadataRepository(options *MetadataRepositoryOptions) *RaftMetadataRepository

func (*RaftMetadataRepository) AddPeer

func (mr *RaftMetadataRepository) AddPeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID, url string) error

func (*RaftMetadataRepository) Close

func (mr *RaftMetadataRepository) Close() error

TODO:: handle pendding msg

func (*RaftMetadataRepository) GetClusterInfo

func (*RaftMetadataRepository) GetLastCommitResults

func (mr *RaftMetadataRepository) GetLastCommitResults() *mrpb.LogStreamCommitResults

func (*RaftMetadataRepository) GetLastCommitVersion

func (mr *RaftMetadataRepository) GetLastCommitVersion() types.Version

func (*RaftMetadataRepository) GetMetadata

func (*RaftMetadataRepository) GetOldestCommitVersion

func (mr *RaftMetadataRepository) GetOldestCommitVersion() types.Version

func (*RaftMetadataRepository) GetReportCount

func (mr *RaftMetadataRepository) GetReportCount() uint64

func (*RaftMetadataRepository) GetReporterClient

func (*RaftMetadataRepository) GetServerAddr

func (mr *RaftMetadataRepository) GetServerAddr() string

func (*RaftMetadataRepository) IsLearner

func (mr *RaftMetadataRepository) IsLearner() bool

func (*RaftMetadataRepository) IsMember

func (mr *RaftMetadataRepository) IsMember() bool

func (*RaftMetadataRepository) LookupNextCommitResults

func (mr *RaftMetadataRepository) LookupNextCommitResults(ver types.Version) (*mrpb.LogStreamCommitResults, error)

func (*RaftMetadataRepository) ProposeReport

func (*RaftMetadataRepository) RegisterLogStream

func (mr *RaftMetadataRepository) RegisterLogStream(ctx context.Context, ls *varlogpb.LogStreamDescriptor) error

func (*RaftMetadataRepository) RegisterStorageNode

func (mr *RaftMetadataRepository) RegisterStorageNode(ctx context.Context, sn *varlogpb.StorageNodeDescriptor) error

func (*RaftMetadataRepository) RegisterTopic

func (mr *RaftMetadataRepository) RegisterTopic(ctx context.Context, topicID types.TopicID) error

func (*RaftMetadataRepository) RemovePeer

func (mr *RaftMetadataRepository) RemovePeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID) error

func (*RaftMetadataRepository) Run

func (mr *RaftMetadataRepository) Run()

func (*RaftMetadataRepository) Seal

func (*RaftMetadataRepository) UnregisterLogStream

func (mr *RaftMetadataRepository) UnregisterLogStream(ctx context.Context, lsID types.LogStreamID) error

func (*RaftMetadataRepository) UnregisterStorageNode

func (mr *RaftMetadataRepository) UnregisterStorageNode(ctx context.Context, snID types.StorageNodeID) error

func (*RaftMetadataRepository) UnregisterTopic

func (mr *RaftMetadataRepository) UnregisterTopic(ctx context.Context, topicID types.TopicID) error

func (*RaftMetadataRepository) Unseal

func (*RaftMetadataRepository) UpdateLogStream

func (*RaftMetadataRepository) Wait

func (mr *RaftMetadataRepository) Wait()

type RaftOptions

type RaftOptions struct {
	NodeID            types.NodeID
	Join              bool
	UnsafeNoWal       bool
	SnapCount         uint64
	SnapCatchUpCount  uint64
	MaxSnapPurgeCount uint
	MaxWalPurgeCount  uint
	RaftTick          time.Duration
	RaftDir           string
	Peers             []string
}

type ReportCollector

type ReportCollector interface {
	Run() error

	Reset()

	Close()

	Recover([]*varlogpb.StorageNodeDescriptor, []*varlogpb.LogStreamDescriptor, types.Version) error

	RegisterStorageNode(*varlogpb.StorageNodeDescriptor) error

	UnregisterStorageNode(types.StorageNodeID) error

	RegisterLogStream(types.TopicID, types.StorageNodeID, types.LogStreamID, types.Version, varlogpb.LogStreamStatus) error

	UnregisterLogStream(types.StorageNodeID, types.LogStreamID) error

	Commit()

	Seal(types.LogStreamID)

	Unseal(types.LogStreamID, types.Version)

	NumExecutors() int

	NumCommitter() int
}

type ReportCollectorHelper

type ReportCollectorHelper interface {
	ProposeReport(types.StorageNodeID, []snpb.LogStreamUncommitReport) error

	GetReporterClient(context.Context, *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)

	GetLastCommitResults() *mrpb.LogStreamCommitResults

	LookupNextCommitResults(types.Version) (*mrpb.LogStreamCommitResults, error)
}

type ReporterClientFactory

type ReporterClientFactory interface {
	GetReporterClient(context.Context, *varlogpb.StorageNodeDescriptor) (reportcommitter.Client, error)
}

type SnapshotGetter

type SnapshotGetter interface {
	GetSnapshotIndex() uint64

	GetSnapshot() ([]byte, *raftpb.ConfState, uint64)
}

type TelemetryOptions

type TelemetryOptions struct {
	CollectorName     string
	CollectorEndpoint string
}

type TopicLSID

type TopicLSID struct {
	TopicID     types.TopicID
	LogStreamID types.LogStreamID
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL