streamclient

package
v0.0.0-...-6aa601a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2021 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KB = 1024
	MB = 1024 * KB
	GB = 1024 * MB

	MaxExtentSize = 64 * MB
)
View Source
const (
	HintReadThrough        = 1 << 0
	HintReadFromCache byte = 1 << 1
)

hint

Variables

This section is empty.

Functions

This section is empty.

Types

type AutumnBlockReader

type AutumnBlockReader struct {
	// contains filtered or unexported fields
}

func (*AutumnBlockReader) Read

func (br *AutumnBlockReader) Read(ctx context.Context, extentID uint64, offset uint32, numOfBlocks uint32, hint byte) ([]*pb.Block, uint32, error)

type AutumnEntryIter

type AutumnEntryIter struct {
	// contains filtered or unexported fields
}

func (*AutumnEntryIter) HasNext

func (iter *AutumnEntryIter) HasNext() (bool, error)

func (*AutumnEntryIter) Next

func (iter *AutumnEntryIter) Next() *pb.EntryInfo

type AutumnStreamClient

type AutumnStreamClient struct {
	StreamClient
	// contains filtered or unexported fields
}

for single stream

func NewStreamClient

func NewStreamClient(sm *smclient.SMClient, em *smclient.ExtentManager, streamID uint64, streamLock StreamLock) *AutumnStreamClient

func (*AutumnStreamClient) Append

func (sc *AutumnStreamClient) Append(ctx context.Context, blocks []*pb.Block, mustSync bool) (uint64, []uint32, uint32, error)

func (*AutumnStreamClient) AppendEntries

func (sc *AutumnStreamClient) AppendEntries(ctx context.Context, entries []*pb.EntryInfo, mustSync bool) (uint64, uint32, error)

AppendEntries blocks until success make all entries in the same extentID, and fill entires.

func (*AutumnStreamClient) Close

func (sc *AutumnStreamClient) Close()

func (*AutumnStreamClient) Connect

func (sc *AutumnStreamClient) Connect() error

func (*AutumnStreamClient) End

func (sc *AutumnStreamClient) End() uint32

func (*AutumnStreamClient) MustAllocNewExtent

func (sc *AutumnStreamClient) MustAllocNewExtent() error

alloc new extent, and reset sc.end = 0

func (*AutumnStreamClient) NewLogEntryIter

func (sc *AutumnStreamClient) NewLogEntryIter(opts ...ReadOption) LogEntryIter

func (*AutumnStreamClient) Truncate

func (sc *AutumnStreamClient) Truncate(ctx context.Context, extentID uint64, gabageKey string) (*pb.BlobStreams, error)

Truncate keeps extentID, if gabageKey is not nil, move all extents before extentID to gabageKey, else unref all extents.

type BlockReader

type BlockReader interface {
	Read(ctx context.Context, extentID uint64, offset uint32, numOfBlocks uint32, hint byte) ([]*pb.Block, uint32, error)
}

random read block

type LogEntryIter

type LogEntryIter interface {
	HasNext() (bool, error)
	Next() *pb.EntryInfo
}

type MockBlockReader

type MockBlockReader struct {
	utils.SafeMutex //protect exs
	// contains filtered or unexported fields
}

func NewMockBlockReader

func NewMockBlockReader() *MockBlockReader

func (*MockBlockReader) Read

func (br *MockBlockReader) Read(ctx context.Context, extentID uint64, offset uint32, numOfBlocks uint32, hint byte) ([]*pb.Block, uint32, error)

type MockEtcd

type MockEtcd struct {
	Tables []*pspb.Location
}

func (*MockEtcd) SetRowStreamTables

func (c *MockEtcd) SetRowStreamTables(id uint64, tables []*pspb.Location) error

type MockLockEntryIter

type MockLockEntryIter struct {
	// contains filtered or unexported fields
}

func (*MockLockEntryIter) HasNext

func (iter *MockLockEntryIter) HasNext() (bool, error)

func (*MockLockEntryIter) Next

func (iter *MockLockEntryIter) Next() *pb.EntryInfo

type MockStreamClient

type MockStreamClient struct {
	ID uint64
	// contains filtered or unexported fields
}

func (*MockStreamClient) Append

func (client *MockStreamClient) Append(ctx context.Context, blocks []*pb.Block, mustSync bool) (uint64, []uint32, uint32, error)

block API

func (*MockStreamClient) AppendEntries

func (client *MockStreamClient) AppendEntries(ctx context.Context, entries []*pb.EntryInfo, mustSync bool) (uint64, uint32, error)

block API, entries has been batched

func (*MockStreamClient) CheckCommitLength

func (client *MockStreamClient) CheckCommitLength() error

func (*MockStreamClient) Close

func (client *MockStreamClient) Close()

func (*MockStreamClient) Connect

func (client *MockStreamClient) Connect() error

func (*MockStreamClient) End

func (client *MockStreamClient) End() uint32

func (*MockStreamClient) NewLogEntryIter

func (client *MockStreamClient) NewLogEntryIter(opts ...ReadOption) LogEntryIter

func (*MockStreamClient) Truncate

func (client *MockStreamClient) Truncate(ctx context.Context, extentID uint64, gabageKey string) (*pb.BlobStreams, error)

type ReadOption

type ReadOption func(*readOption)

func WithReadFrom

func WithReadFrom(extentID uint64, offset uint32) ReadOption

func WithReadFromStart

func WithReadFromStart() ReadOption

func WithReplay

func WithReplay() ReadOption

type StreamClient

type StreamClient interface {
	Connect() error
	Close()
	AppendEntries(ctx context.Context, entries []*pb.EntryInfo, mustSync bool) (uint64, uint32, error)
	Append(ctx context.Context, blocks []*pb.Block, mustSync bool) (extentID uint64, offsets []uint32, end uint32, err error)
	NewLogEntryIter(opt ...ReadOption) LogEntryIter
	//Read(ctx context.Context, extentID uint64, offset uint32, numOfBlocks uint32) ([]*pb.Block, uint32, error)
	Truncate(ctx context.Context, extentID uint64, gabageKey string) (*pb.BlobStreams, error)
	//FIXME: stat => ([]extentID , offset)
	End() uint32
}

func NewMockStreamClient

func NewMockStreamClient(suffix string, br *MockBlockReader) StreamClient

func OpenMockStreamClient

func OpenMockStreamClient(si pb.StreamInfo, br *MockBlockReader) StreamClient

only open log file

type StreamLock

type StreamLock struct {
	// contains filtered or unexported fields
}

func MutexToLock

func MutexToLock(mutex *concurrency.Mutex) StreamLock

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL