Documentation ¶
Index ¶
- Constants
- func TestNewBatchData(tb testing.TB, batchLen int, msgSize int) [][]byte
- func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func())
- func TestNewReplicatorClient(t *testing.T, addr string) (snpb.ReplicatorClient, func())
- type Executor
- func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.AppendResult, error)
- func (lse *Executor) Close() (err error)
- func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamCommitResult) error
- func (lse *Executor) LogStreamMetadata() (lsd varlogpb.LogStreamDescriptor, err error)
- func (lse *Executor) Metadata() (snpb.LogStreamReplicaMetadataDescriptor, error)
- func (lse *Executor) Metrics() *telemetry.LogStreamMetrics
- func (lse *Executor) Path() string
- func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataList [][]byte) error
- func (lse *Executor) Report(_ context.Context) (report snpb.LogStreamUncommitReport, err error)
- func (lse *Executor) Seal(_ context.Context, lastCommittedGLSN types.GLSN) (status varlogpb.LogStreamStatus, localHWM types.GLSN, err error)
- func (lse *Executor) SubscribeWithGLSN(begin, end types.GLSN) (*SubscribeResult, error)
- func (lse *Executor) SubscribeWithLLSN(begin, end types.LLSN) (*SubscribeResult, error)
- func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamReplica) (*snpb.SyncStatus, error)
- func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamReplica, ...) (syncRange snpb.SyncRange, err error)
- func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStreamReplica, ...) (err error)
- func (lse *Executor) Trim(_ context.Context, glsn types.GLSN) error
- func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamReplica) (err error)
- type ExecutorOption
- func WithAdvertiseAddress(advertiseAddress string) ExecutorOption
- func WithClusterID(cid types.ClusterID) ExecutorOption
- func WithCommitQueueCapacity(commitQueueCapacity int) ExecutorOption
- func WithLogStreamID(lsid types.LogStreamID) ExecutorOption
- func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption
- func WithLogger(logger *zap.Logger) ExecutorOption
- func WithReplicateClientGRPCOptions(replicateClientGRPCOptions ...grpc.DialOption) ExecutorOption
- func WithReplicateClientQueueCapacity(replicateClientQueueCapacity int) ExecutorOption
- func WithSequenceQueueCapacity(sequenceQueueCapacity int) ExecutorOption
- func WithStorage(stg *storage.Storage) ExecutorOption
- func WithStorageNodeID(snid types.StorageNodeID) ExecutorOption
- func WithSyncInitTimeout(syncInitTimeout time.Duration) ExecutorOption
- func WithTopicID(tpid types.TopicID) ExecutorOption
- func WithWriteQueueCapacity(writeQueueCapacity int) ExecutorOption
- type SubscribeResult
Constants ¶
const ( DefaultSequenceQueueCapacity = 1024 DefaultWriteQueueCapacity = 1024 DefaultCommitQueueCapacity = 1024 DefaultReplicateClientQueueCapacity = 1024 DefaultSyncInitTimeout = 10 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func TestNewReplicateServer ¶
func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func())
func TestNewReplicatorClient ¶ added in v0.5.0
func TestNewReplicatorClient(t *testing.T, addr string) (snpb.ReplicatorClient, func())
Types ¶
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error)
func (*Executor) LogStreamMetadata ¶
func (lse *Executor) LogStreamMetadata() (lsd varlogpb.LogStreamDescriptor, err error)
func (*Executor) Metadata ¶
func (lse *Executor) Metadata() (snpb.LogStreamReplicaMetadataDescriptor, error)
func (*Executor) Metrics ¶
func (lse *Executor) Metrics() *telemetry.LogStreamMetrics
func (*Executor) Path ¶ added in v0.5.0
Path returns the data directory where the replica stores its data.
func (*Executor) SubscribeWithGLSN ¶
func (lse *Executor) SubscribeWithGLSN(begin, end types.GLSN) (*SubscribeResult, error)
SubscribeWithGLSN subscribes to the log stream with the given range of GLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.
func (*Executor) SubscribeWithLLSN ¶
func (lse *Executor) SubscribeWithLLSN(begin, end types.LLSN) (*SubscribeResult, error)
SubscribeWithLLSN subscribes to the log stream with the given range of LLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.
func (*Executor) Sync ¶
func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamReplica) (*snpb.SyncStatus, error)
func (*Executor) SyncReplicate ¶
func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStreamReplica, payload snpb.SyncPayload) (err error)
type ExecutorOption ¶
type ExecutorOption interface {
// contains filtered or unexported methods
}
func WithAdvertiseAddress ¶
func WithAdvertiseAddress(advertiseAddress string) ExecutorOption
func WithClusterID ¶
func WithClusterID(cid types.ClusterID) ExecutorOption
func WithCommitQueueCapacity ¶
func WithCommitQueueCapacity(commitQueueCapacity int) ExecutorOption
func WithLogStreamID ¶
func WithLogStreamID(lsid types.LogStreamID) ExecutorOption
func WithLogStreamMetrics ¶
func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption
func WithLogger ¶
func WithLogger(logger *zap.Logger) ExecutorOption
func WithReplicateClientGRPCOptions ¶
func WithReplicateClientGRPCOptions(replicateClientGRPCOptions ...grpc.DialOption) ExecutorOption
func WithReplicateClientQueueCapacity ¶
func WithReplicateClientQueueCapacity(replicateClientQueueCapacity int) ExecutorOption
func WithSequenceQueueCapacity ¶
func WithSequenceQueueCapacity(sequenceQueueCapacity int) ExecutorOption
func WithStorage ¶
func WithStorage(stg *storage.Storage) ExecutorOption
func WithStorageNodeID ¶
func WithStorageNodeID(snid types.StorageNodeID) ExecutorOption
func WithSyncInitTimeout ¶
func WithSyncInitTimeout(syncInitTimeout time.Duration) ExecutorOption
func WithTopicID ¶
func WithTopicID(tpid types.TopicID) ExecutorOption
func WithWriteQueueCapacity ¶
func WithWriteQueueCapacity(writeQueueCapacity int) ExecutorOption
type SubscribeResult ¶
type SubscribeResult struct {
// contains filtered or unexported fields
}
func (*SubscribeResult) Err ¶
func (sr *SubscribeResult) Err() error
Err returns the error of the subscription. This should be called after Stop is called.
func (*SubscribeResult) Result ¶
func (sr *SubscribeResult) Result() <-chan varlogpb.LogEntry
Result returns the channel of the result.
func (*SubscribeResult) Stop ¶
func (sr *SubscribeResult) Stop()
Stop stops the subscription. This should be called to release resources after using SubscribeResult.