logstream

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2022 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSequenceQueueCapacity        = 1024
	DefaultWriteQueueCapacity           = 1024
	DefaultCommitQueueCapacity          = 1024
	DefaultReplicateClientQueueCapacity = 1024
	DefaultSyncTimeout                  = 10 * time.Second
)

Variables

This section is empty.

Functions

func TestGetStorage added in v0.7.0

func TestGetStorage(t *testing.T, lse *Executor) *storage.Storage

func TestNewBatchData

func TestNewBatchData(tb testing.TB, batchLen int, msgSize int) [][]byte

func TestNewReplicateServer

func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func())

func TestNewReplicatorClient added in v0.5.0

func TestNewReplicatorClient(t *testing.T, addr string) (snpb.ReplicatorClient, func())

Types

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error)

func (*Executor) Append

func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.AppendResult, error)

Append appends a batch of logs to the log stream.

func (*Executor) Close

func (lse *Executor) Close() (err error)

func (*Executor) Commit

func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamCommitResult) error

func (*Executor) LogStreamMetadata

func (lse *Executor) LogStreamMetadata() (lsd varlogpb.LogStreamDescriptor, err error)

func (*Executor) Metadata

func (*Executor) Metrics

func (lse *Executor) Metrics() *telemetry.LogStreamMetrics

func (*Executor) Path added in v0.5.0

func (lse *Executor) Path() string

Path returns the data directory where the replica stores its data.

func (*Executor) Replicate

func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataList [][]byte) error

func (*Executor) Report

func (lse *Executor) Report(_ context.Context) (report snpb.LogStreamUncommitReport, err error)

func (*Executor) Seal

func (lse *Executor) Seal(_ context.Context, lastCommittedGLSN types.GLSN) (status varlogpb.LogStreamStatus, localHWM types.GLSN, err error)

func (*Executor) SubscribeWithGLSN

func (lse *Executor) SubscribeWithGLSN(begin, end types.GLSN) (*SubscribeResult, error)

SubscribeWithGLSN subscribes to the log stream with the given range of GLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.

func (*Executor) SubscribeWithLLSN

func (lse *Executor) SubscribeWithLLSN(begin, end types.LLSN) (*SubscribeResult, error)

SubscribeWithLLSN subscribes to the log stream with the given range of LLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.

func (*Executor) Sync

func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamReplica) (*snpb.SyncStatus, error)

func (*Executor) SyncInit

func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamReplica, srcRange snpb.SyncRange) (syncRange snpb.SyncRange, err error)

func (*Executor) SyncReplicate

func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStreamReplica, payload snpb.SyncPayload) (err error)

func (*Executor) Trim

func (lse *Executor) Trim(_ context.Context, glsn types.GLSN) error

func (*Executor) Unseal

func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamReplica) (err error)

type ExecutorOption

type ExecutorOption interface {
	// contains filtered or unexported methods
}

func WithAdvertiseAddress

func WithAdvertiseAddress(advertiseAddress string) ExecutorOption

func WithClusterID

func WithClusterID(cid types.ClusterID) ExecutorOption

func WithCommitQueueCapacity

func WithCommitQueueCapacity(commitQueueCapacity int) ExecutorOption

func WithLogStreamID

func WithLogStreamID(lsid types.LogStreamID) ExecutorOption

func WithLogStreamMetrics

func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption

func WithLogger

func WithLogger(logger *zap.Logger) ExecutorOption

func WithReplicateClientGRPCOptions

func WithReplicateClientGRPCOptions(replicateClientGRPCOptions ...grpc.DialOption) ExecutorOption

func WithReplicateClientQueueCapacity

func WithReplicateClientQueueCapacity(replicateClientQueueCapacity int) ExecutorOption

func WithSequenceQueueCapacity

func WithSequenceQueueCapacity(sequenceQueueCapacity int) ExecutorOption

func WithStorage

func WithStorage(stg *storage.Storage) ExecutorOption

func WithStorageNodeID

func WithStorageNodeID(snid types.StorageNodeID) ExecutorOption

func WithSyncTimeout added in v0.7.0

func WithSyncTimeout(syncTimeout time.Duration) ExecutorOption

WithSyncTimeout sets timeout for synchronization in the destination replica. If the destination replica doesn't receive the SyncReplicate RPC within syncTimeout, other SyncInit RPC can cancel the synchronization.

func WithTopicID

func WithTopicID(tpid types.TopicID) ExecutorOption

func WithWriteQueueCapacity

func WithWriteQueueCapacity(writeQueueCapacity int) ExecutorOption

type SubscribeResult

type SubscribeResult struct {
	// contains filtered or unexported fields
}

func (*SubscribeResult) Err

func (sr *SubscribeResult) Err() error

Err returns the error of the subscription. This should be called after Stop is called.

func (*SubscribeResult) Result

func (sr *SubscribeResult) Result() <-chan varlogpb.LogEntry

Result returns the channel of the result.

func (*SubscribeResult) Stop

func (sr *SubscribeResult) Stop()

Stop stops the subscription. This should be called to release resources after using SubscribeResult.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL