Documentation ¶
Index ¶
- Constants
- func TestGetReportCommitBase(t *testing.T, lse *Executor) (commitVersion types.Version, highWatermark types.GLSN, ...)
- func TestGetStorage(t *testing.T, lse *Executor) *storage.Storage
- func TestGetUncommittedLLSNEnd(t *testing.T, lse *Executor) types.LLSN
- func TestNewBatchData(tb testing.TB, batchLen int, msgSize int) [][]byte
- func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func())
- func TestNewReplicatorClient(t *testing.T, addr string) (snpb.ReplicatorClient, func())
- type AppendTask
- type Executor
- func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.AppendResult, error)
- func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, appendTask *AppendTask) error
- func (lse *Executor) Close() (err error)
- func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamCommitResult) error
- func (lse *Executor) Metadata() (snpb.LogStreamReplicaMetadataDescriptor, error)
- func (lse *Executor) Metrics() *telemetry.LogStreamMetrics
- func (lse *Executor) Path() string
- func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataList [][]byte) error
- func (lse *Executor) Report(_ context.Context) (report snpb.LogStreamUncommitReport, err error)
- func (lse *Executor) Seal(_ context.Context, lastCommittedGLSN types.GLSN) (status varlogpb.LogStreamStatus, localHWM types.GLSN, err error)
- func (lse *Executor) SubscribeWithGLSN(begin, end types.GLSN) (*SubscribeResult, error)
- func (lse *Executor) SubscribeWithLLSN(begin, end types.LLSN) (*SubscribeResult, error)
- func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamReplica) (*snpb.SyncStatus, error)
- func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamReplica, ...) (syncRange snpb.SyncRange, err error)
- func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStreamReplica, ...) (err error)
- func (lse *Executor) Trim(_ context.Context, glsn types.GLSN) error
- func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamReplica) (err error)
- type ExecutorOption
- func WithAdvertiseAddress(advertiseAddress string) ExecutorOption
- func WithClusterID(cid types.ClusterID) ExecutorOption
- func WithCommitQueueCapacity(commitQueueCapacity int) ExecutorOption
- func WithLogStreamID(lsid types.LogStreamID) ExecutorOption
- func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption
- func WithLogger(logger *zap.Logger) ExecutorOption
- func WithReplicateClientGRPCOptions(replicateClientGRPCOptions ...grpc.DialOption) ExecutorOption
- func WithReplicateClientQueueCapacity(replicateClientQueueCapacity int) ExecutorOption
- func WithSequenceQueueCapacity(sequenceQueueCapacity int) ExecutorOption
- func WithStorage(stg *storage.Storage) ExecutorOption
- func WithStorageNodeID(snid types.StorageNodeID) ExecutorOption
- func WithSyncTimeout(syncTimeout time.Duration) ExecutorOption
- func WithTopicID(tpid types.TopicID) ExecutorOption
- func WithWriteQueueCapacity(writeQueueCapacity int) ExecutorOption
- type SubscribeResult
Constants ¶
const ( DefaultSequenceQueueCapacity = 1024 DefaultWriteQueueCapacity = 1024 DefaultCommitQueueCapacity = 1024 DefaultReplicateClientQueueCapacity = 1024 DefaultSyncTimeout = 10 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func TestGetReportCommitBase ¶ added in v0.14.0
func TestGetStorage ¶ added in v0.7.0
func TestGetUncommittedLLSNEnd ¶ added in v0.14.0
func TestNewReplicateServer ¶
func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func())
func TestNewReplicatorClient ¶ added in v0.5.0
func TestNewReplicatorClient(t *testing.T, addr string) (snpb.ReplicatorClient, func())
Types ¶
type AppendTask ¶ added in v0.14.0
type AppendTask struct {
// contains filtered or unexported fields
}
func NewAppendTask ¶ added in v0.14.0
func NewAppendTask() *AppendTask
func (*AppendTask) Release ¶ added in v0.14.0
func (at *AppendTask) Release()
func (*AppendTask) ReleaseWriteWaitGroups ¶ added in v0.14.0
func (at *AppendTask) ReleaseWriteWaitGroups()
func (*AppendTask) SetError ¶ added in v0.14.0
func (at *AppendTask) SetError(err error)
func (*AppendTask) WaitForCompletion ¶ added in v0.14.0
func (at *AppendTask) WaitForCompletion(ctx context.Context) (res []snpb.AppendResult, err error)
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error)
func (*Executor) AppendAsync ¶ added in v0.14.0
func (*Executor) Metadata ¶
func (lse *Executor) Metadata() (snpb.LogStreamReplicaMetadataDescriptor, error)
func (*Executor) Metrics ¶
func (lse *Executor) Metrics() *telemetry.LogStreamMetrics
func (*Executor) Path ¶ added in v0.5.0
Path returns the data directory where the replica stores its data.
func (*Executor) Seal ¶
func (lse *Executor) Seal(_ context.Context, lastCommittedGLSN types.GLSN) (status varlogpb.LogStreamStatus, localHWM types.GLSN, err error)
Seal checks whether the log stream replica has committed the log entry confirmed lastly by the metadata repository. Since Trim can remove the last log entry, Seal compares the argument lastCommittedGLSN to the uncommittedBegin of the log stream context instead of the local high watermark.
FIXME: No need to return localHWM.
func (*Executor) SubscribeWithGLSN ¶
func (lse *Executor) SubscribeWithGLSN(begin, end types.GLSN) (*SubscribeResult, error)
SubscribeWithGLSN subscribes to the log stream with the given range of GLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.
func (*Executor) SubscribeWithLLSN ¶
func (lse *Executor) SubscribeWithLLSN(begin, end types.LLSN) (*SubscribeResult, error)
SubscribeWithLLSN subscribes to the log stream with the given range of LLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.
func (*Executor) Sync ¶
func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamReplica) (*snpb.SyncStatus, error)
func (*Executor) SyncReplicate ¶
func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStreamReplica, payload snpb.SyncPayload) (err error)
type ExecutorOption ¶
type ExecutorOption interface {
// contains filtered or unexported methods
}
func WithAdvertiseAddress ¶
func WithAdvertiseAddress(advertiseAddress string) ExecutorOption
func WithClusterID ¶
func WithClusterID(cid types.ClusterID) ExecutorOption
func WithCommitQueueCapacity ¶
func WithCommitQueueCapacity(commitQueueCapacity int) ExecutorOption
func WithLogStreamID ¶
func WithLogStreamID(lsid types.LogStreamID) ExecutorOption
func WithLogStreamMetrics ¶
func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption
func WithLogger ¶
func WithLogger(logger *zap.Logger) ExecutorOption
func WithReplicateClientGRPCOptions ¶
func WithReplicateClientGRPCOptions(replicateClientGRPCOptions ...grpc.DialOption) ExecutorOption
func WithReplicateClientQueueCapacity ¶
func WithReplicateClientQueueCapacity(replicateClientQueueCapacity int) ExecutorOption
func WithSequenceQueueCapacity ¶
func WithSequenceQueueCapacity(sequenceQueueCapacity int) ExecutorOption
func WithStorage ¶
func WithStorage(stg *storage.Storage) ExecutorOption
func WithStorageNodeID ¶
func WithStorageNodeID(snid types.StorageNodeID) ExecutorOption
func WithSyncTimeout ¶ added in v0.7.0
func WithSyncTimeout(syncTimeout time.Duration) ExecutorOption
WithSyncTimeout sets timeout for synchronization in the destination replica. If the destination replica doesn't receive the SyncReplicate RPC within syncTimeout, other SyncInit RPC can cancel the synchronization.
func WithTopicID ¶
func WithTopicID(tpid types.TopicID) ExecutorOption
func WithWriteQueueCapacity ¶
func WithWriteQueueCapacity(writeQueueCapacity int) ExecutorOption
type SubscribeResult ¶
type SubscribeResult struct {
// contains filtered or unexported fields
}
func (*SubscribeResult) Err ¶
func (sr *SubscribeResult) Err() error
Err returns the error of the subscription. This should be called after Stop is called.
func (*SubscribeResult) Result ¶
func (sr *SubscribeResult) Result() <-chan varlogpb.LogEntry
Result returns the channel of the result.
func (*SubscribeResult) Stop ¶
func (sr *SubscribeResult) Stop()
Stop stops the subscription. This should be called to release resources after using SubscribeResult.