logstream

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2022 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSequenceQueueCapacity        = 1024
	DefaultWriteQueueCapacity           = 1024
	DefaultCommitQueueCapacity          = 1024
	DefaultReplicateClientQueueCapacity = 1024
	DefaultSyncInitTimeout              = 10 * time.Second
)

Variables

This section is empty.

Functions

func TestNewBatchData

func TestNewBatchData(tb testing.TB, batchLen int, msgSize int) [][]byte

func TestNewReplicateServer

func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func())

Types

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error)

func (*Executor) Append

func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.AppendResult, error)

Append appends a batch of logs to the log stream.

func (*Executor) Close

func (lse *Executor) Close() (err error)

func (*Executor) Commit

func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamCommitResult) error

func (*Executor) LogStreamMetadata

func (lse *Executor) LogStreamMetadata() (lsd varlogpb.LogStreamDescriptor, err error)

func (*Executor) Metadata

func (*Executor) Metrics

func (lse *Executor) Metrics() *telemetry.LogStreamMetrics

func (*Executor) Replicate

func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataList [][]byte) error

func (*Executor) Report

func (*Executor) Seal

func (lse *Executor) Seal(_ context.Context, lastCommittedGLSN types.GLSN) (status varlogpb.LogStreamStatus, localHWM types.GLSN, err error)

func (*Executor) SubscribeWithGLSN

func (lse *Executor) SubscribeWithGLSN(begin, end types.GLSN) (*SubscribeResult, error)

SubscribeWithGLSN subscribes to the log stream with the given range of GLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.

func (*Executor) SubscribeWithLLSN

func (lse *Executor) SubscribeWithLLSN(begin, end types.LLSN) (*SubscribeResult, error)

SubscribeWithLLSN subscribes to the log stream with the given range of LLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.

func (*Executor) Sync

func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamReplica) (*snpb.SyncStatus, error)

func (*Executor) SyncInit

func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamReplica, srcRange snpb.SyncRange) (syncRange snpb.SyncRange, err error)

func (*Executor) SyncReplicate

func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStreamReplica, payload snpb.SyncPayload) (err error)

func (*Executor) Trim

func (lse *Executor) Trim(_ context.Context, glsn types.GLSN) error

func (*Executor) Unseal

func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamReplica) (err error)

type ExecutorOption

type ExecutorOption interface {
	// contains filtered or unexported methods
}

func WithAdvertiseAddress

func WithAdvertiseAddress(advertiseAddress string) ExecutorOption

func WithClusterID

func WithClusterID(cid types.ClusterID) ExecutorOption

func WithCommitQueueCapacity

func WithCommitQueueCapacity(commitQueueCapacity int) ExecutorOption

func WithLogStreamID

func WithLogStreamID(lsid types.LogStreamID) ExecutorOption

func WithLogStreamMetrics

func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption

func WithLogger

func WithLogger(logger *zap.Logger) ExecutorOption

func WithReplicateClientGRPCOptions

func WithReplicateClientGRPCOptions(replicateClientGRPCOptions ...grpc.DialOption) ExecutorOption

func WithReplicateClientQueueCapacity

func WithReplicateClientQueueCapacity(replicateClientQueueCapacity int) ExecutorOption

func WithSequenceQueueCapacity

func WithSequenceQueueCapacity(sequenceQueueCapacity int) ExecutorOption

func WithStorage

func WithStorage(stg *storage.Storage) ExecutorOption

func WithStorageNodeID

func WithStorageNodeID(snid types.StorageNodeID) ExecutorOption

func WithSyncInitTimeout

func WithSyncInitTimeout(syncInitTimeout time.Duration) ExecutorOption

func WithTopicID

func WithTopicID(tpid types.TopicID) ExecutorOption

func WithWriteQueueCapacity

func WithWriteQueueCapacity(writeQueueCapacity int) ExecutorOption

type SubscribeResult

type SubscribeResult struct {
	// contains filtered or unexported fields
}

func (*SubscribeResult) Err

func (sr *SubscribeResult) Err() error

Err returns the error of the subscription. This should be called after Stop is called.

func (*SubscribeResult) Result

func (sr *SubscribeResult) Result() <-chan varlogpb.LogEntry

Result returns the channel of the result.

func (*SubscribeResult) Stop

func (sr *SubscribeResult) Stop()

Stop stops the subscription. This should be called to release resources after using SubscribeResult.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL