logstream

package
v0.23.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSequenceQueueCapacity        = 1024
	DefaultWriteQueueCapacity           = 1024
	DefaultCommitQueueCapacity          = 1024
	DefaultReplicateClientQueueCapacity = 1024
	DefaultSyncTimeout                  = 10 * time.Second
)

Variables

This section is empty.

Functions

func TestGetReportCommitBase added in v0.14.0

func TestGetReportCommitBase(t *testing.T, lse *Executor) (commitVersion types.Version, highWatermark types.GLSN, uncommittedBegin varlogpb.LogSequenceNumber, invalid bool)

func TestGetStorage added in v0.7.0

func TestGetStorage(t *testing.T, lse *Executor) *storage.Storage

func TestGetUncommittedLLSNEnd added in v0.14.0

func TestGetUncommittedLLSNEnd(t *testing.T, lse *Executor) types.LLSN

func TestNewBatchData

func TestNewBatchData(tb testing.TB, batchLen int, msgSize int) [][]byte

func TestNewReplicateServer

func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func())

func TestNewReplicatorClient added in v0.5.0

func TestNewReplicatorClient(t *testing.T, addr string) (snpb.ReplicatorClient, func())

Types

type AppendTask added in v0.14.0

type AppendTask struct {
	LogStreamID  types.LogStreamID
	RPCStartTime time.Time
	// contains filtered or unexported fields
}

func NewAppendTask added in v0.14.0

func NewAppendTask() *AppendTask

func (*AppendTask) Release added in v0.14.0

func (at *AppendTask) Release()

func (*AppendTask) ReleaseWriteWaitGroups added in v0.14.0

func (at *AppendTask) ReleaseWriteWaitGroups()

func (*AppendTask) SetError added in v0.14.0

func (at *AppendTask) SetError(err error)

func (*AppendTask) WaitForCompletion added in v0.14.0

func (at *AppendTask) WaitForCompletion(ctx context.Context) (res []snpb.AppendResult, err error)

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error)

func (*Executor) Append

func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.AppendResult, error)

Append appends a batch of logs to the log stream.

func (*Executor) AppendAsync added in v0.14.0

func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, appendTask *AppendTask) error

func (*Executor) Close

func (lse *Executor) Close() (err error)

func (*Executor) Commit

func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamCommitResult) error

func (*Executor) Metadata

func (*Executor) Metrics

func (lse *Executor) Metrics() *telemetry.LogStreamMetrics

func (*Executor) Path added in v0.5.0

func (lse *Executor) Path() string

Path returns the data directory where the replica stores its data.

func (*Executor) Replicate

func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataList [][]byte) error

func (*Executor) Report

func (lse *Executor) Report(_ context.Context) (report snpb.LogStreamUncommitReport, err error)

func (*Executor) Seal

func (lse *Executor) Seal(_ context.Context, lastCommittedGLSN types.GLSN) (status varlogpb.LogStreamStatus, localHWM types.GLSN, err error)

Seal checks whether the log stream replica has committed the log entry confirmed lastly by the metadata repository. Since Trim can remove the last log entry, Seal compares the argument lastCommittedGLSN to the uncommittedBegin of the log stream context instead of the local high watermark.

FIXME: No need to return localHWM.

func (*Executor) SubscribeWithGLSN

func (lse *Executor) SubscribeWithGLSN(begin, end types.GLSN) (*SubscribeResult, error)

SubscribeWithGLSN subscribes to the log stream with the given range of GLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.

func (*Executor) SubscribeWithLLSN

func (lse *Executor) SubscribeWithLLSN(begin, end types.LLSN) (*SubscribeResult, error)

SubscribeWithLLSN subscribes to the log stream with the given range of LLSNs. TODO: The first argument ctx may not be necessary, since the subscription can be stopped by the `internal/varlogsn/logstream.(*SubscribeResult).Stop()`.

func (*Executor) Sync

func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamReplica) (*snpb.SyncStatus, error)

func (*Executor) SyncInit

func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamReplica, srcRange snpb.SyncRange, srcLastCommittedLLSN types.LLSN) (syncRange snpb.SyncRange, err error)

func (*Executor) SyncReplicate

func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStreamReplica, payload snpb.SyncPayload) (err error)

func (*Executor) Trim

func (lse *Executor) Trim(_ context.Context, glsn types.GLSN) error

func (*Executor) Unseal

func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamReplica) (err error)

type ExecutorOption

type ExecutorOption interface {
	// contains filtered or unexported methods
}

func WithAdvertiseAddress

func WithAdvertiseAddress(advertiseAddress string) ExecutorOption

func WithClusterID

func WithClusterID(cid types.ClusterID) ExecutorOption

func WithCommitQueueCapacity

func WithCommitQueueCapacity(commitQueueCapacity int) ExecutorOption

func WithLogStreamID

func WithLogStreamID(lsid types.LogStreamID) ExecutorOption

func WithLogStreamMetrics

func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption

func WithLogger

func WithLogger(logger *zap.Logger) ExecutorOption

func WithReplicateClientGRPCOptions

func WithReplicateClientGRPCOptions(replicateClientGRPCOptions ...grpc.DialOption) ExecutorOption

func WithReplicateClientQueueCapacity

func WithReplicateClientQueueCapacity(replicateClientQueueCapacity int) ExecutorOption

func WithSequenceQueueCapacity

func WithSequenceQueueCapacity(sequenceQueueCapacity int) ExecutorOption

func WithStorage

func WithStorage(stg *storage.Storage) ExecutorOption

func WithStorageNodeID

func WithStorageNodeID(snid types.StorageNodeID) ExecutorOption

func WithSyncTimeout added in v0.7.0

func WithSyncTimeout(syncTimeout time.Duration) ExecutorOption

WithSyncTimeout sets timeout for synchronization in the destination replica. If the destination replica doesn't receive the SyncReplicate RPC within syncTimeout, other SyncInit RPC can cancel the synchronization.

func WithTopicID

func WithTopicID(tpid types.TopicID) ExecutorOption

func WithWriteQueueCapacity

func WithWriteQueueCapacity(writeQueueCapacity int) ExecutorOption

type SubscribeResult

type SubscribeResult struct {
	// contains filtered or unexported fields
}

func (*SubscribeResult) Err

func (sr *SubscribeResult) Err() error

Err returns the error of the subscription. This should be called after Stop is called.

func (*SubscribeResult) Result

func (sr *SubscribeResult) Result() <-chan varlogpb.LogEntry

Result returns the channel of the result.

func (*SubscribeResult) Stop

func (sr *SubscribeResult) Stop()

Stop stops the subscription. This should be called to release resources after using SubscribeResult.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL