Documentation ¶
Index ¶
- Constants
- func TestGetStorage(t *testing.T, lse *Executor) *storage.Storage
- func TestNewBatchData(tb testing.TB, batchLen int, msgSize int) [][]byte
- func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func())
- func TestNewReplicatorClient(t *testing.T, addr string) (snpb.ReplicatorClient, func())
- type Executor
- func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.AppendResult, error)
- func (lse *Executor) Close() (err error)
- func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamCommitResult) error
- func (lse *Executor) Metadata() (snpb.LogStreamReplicaMetadataDescriptor, error)
- func (lse *Executor) Metrics() *telemetry.LogStreamMetrics
- func (lse *Executor) Path() string
- func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataList [][]byte) error
- func (lse *Executor) Report(_ context.Context) (report snpb.LogStreamUncommitReport, err error)
- func (lse *Executor) Seal(_ context.Context, lastCommittedGLSN types.GLSN) (status varlogpb.LogStreamStatus, localHWM types.GLSN, err error)
- func (lse *Executor) SubscribeWithGLSN(begin, end types.GLSN) (*SubscribeResult, error)
- func (lse *Executor) SubscribeWithLLSN(begin, end types.LLSN) (*SubscribeResult, error)
- func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamReplica) (*snpb.SyncStatus, error)
- func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamReplica, ...) (syncRange snpb.SyncRange, err error)
- func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStreamReplica, ...) (err error)
- func (lse *Executor) Trim(_ context.Context, glsn types.GLSN) error
- func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamReplica) (err error)
- type ExecutorOption
- func WithAdvertiseAddress(advertiseAddress string) ExecutorOption
- func WithClusterID(cid types.ClusterID) ExecutorOption
- func WithCommitQueueCapacity(commitQueueCapacity int) ExecutorOption
- func WithLogStreamID(lsid types.LogStreamID) ExecutorOption
- func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption
- func WithLogger(logger *zap.Logger) ExecutorOption
- func WithReplicateClientGRPCOptions(replicateClientGRPCOptions ...grpc.DialOption) ExecutorOption
- func WithReplicateClientQueueCapacity(replicateClientQueueCapacity int) ExecutorOption
- func WithSequenceQueueCapacity(sequenceQueueCapacity int) ExecutorOption
- func WithStorage(stg *storage.Storage) ExecutorOption
- func WithStorageNodeID(snid types.StorageNodeID) ExecutorOption
- func WithSyncTimeout(syncTimeout time.Duration) ExecutorOption
- func WithTopicID(tpid types.TopicID) ExecutorOption
- func WithWriteQueueCapacity(writeQueueCapacity int) ExecutorOption
- type SubscribeResult
Constants ¶
const ( DefaultSequenceQueueCapacity = 1024 DefaultWriteQueueCapacity = 1024 DefaultCommitQueueCapacity = 1024 DefaultReplicateClientQueueCapacity = 1024 DefaultSyncTimeout = 10 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func TestGetStorage ¶ added in v0.7.0
func TestNewReplicateServer ¶
func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func())
func TestNewReplicatorClient ¶ added in v0.5.0
func TestNewReplicatorClient(t *testing.T, addr string) (snpb.ReplicatorClient, func())
Types ¶
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error)
func (*Executor) Metadata ¶
func (lse *Executor) Metadata() (snpb.LogStreamReplicaMetadataDescriptor, error)
func (*Executor) Metrics ¶
func (lse *Executor) Metrics() *telemetry.LogStreamMetrics
func (*Executor) Path ¶ added in v0.5.0
Path returns the data directory where the replica stores its data.
func (*Executor) SubscribeWithGLSN ¶
func (lse *Executor) SubscribeWithGLSN(begin, end types.GLSN) (*SubscribeResult, error)
SubscribeWithGLSN subscribes to the log stream with the given range of GLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.
func (*Executor) SubscribeWithLLSN ¶
func (lse *Executor) SubscribeWithLLSN(begin, end types.LLSN) (*SubscribeResult, error)
SubscribeWithLLSN subscribes to the log stream with the given range of LLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.
func (*Executor) Sync ¶
func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamReplica) (*snpb.SyncStatus, error)
func (*Executor) SyncReplicate ¶
func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStreamReplica, payload snpb.SyncPayload) (err error)
type ExecutorOption ¶
type ExecutorOption interface {
// contains filtered or unexported methods
}
func WithAdvertiseAddress ¶
func WithAdvertiseAddress(advertiseAddress string) ExecutorOption
func WithClusterID ¶
func WithClusterID(cid types.ClusterID) ExecutorOption
func WithCommitQueueCapacity ¶
func WithCommitQueueCapacity(commitQueueCapacity int) ExecutorOption
func WithLogStreamID ¶
func WithLogStreamID(lsid types.LogStreamID) ExecutorOption
func WithLogStreamMetrics ¶
func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption
func WithLogger ¶
func WithLogger(logger *zap.Logger) ExecutorOption
func WithReplicateClientGRPCOptions ¶
func WithReplicateClientGRPCOptions(replicateClientGRPCOptions ...grpc.DialOption) ExecutorOption
func WithReplicateClientQueueCapacity ¶
func WithReplicateClientQueueCapacity(replicateClientQueueCapacity int) ExecutorOption
func WithSequenceQueueCapacity ¶
func WithSequenceQueueCapacity(sequenceQueueCapacity int) ExecutorOption
func WithStorage ¶
func WithStorage(stg *storage.Storage) ExecutorOption
func WithStorageNodeID ¶
func WithStorageNodeID(snid types.StorageNodeID) ExecutorOption
func WithSyncTimeout ¶ added in v0.7.0
func WithSyncTimeout(syncTimeout time.Duration) ExecutorOption
WithSyncTimeout sets timeout for synchronization in the destination replica. If the destination replica doesn't receive the SyncReplicate RPC within syncTimeout, other SyncInit RPC can cancel the synchronization.
func WithTopicID ¶
func WithTopicID(tpid types.TopicID) ExecutorOption
func WithWriteQueueCapacity ¶
func WithWriteQueueCapacity(writeQueueCapacity int) ExecutorOption
type SubscribeResult ¶
type SubscribeResult struct {
// contains filtered or unexported fields
}
func (*SubscribeResult) Err ¶
func (sr *SubscribeResult) Err() error
Err returns the error of the subscription. This should be called after Stop is called.
func (*SubscribeResult) Result ¶
func (sr *SubscribeResult) Result() <-chan varlogpb.LogEntry
Result returns the channel of the result.
func (*SubscribeResult) Stop ¶
func (sr *SubscribeResult) Stop()
Stop stops the subscription. This should be called to release resources after using SubscribeResult.