filechannel

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IteratorBufferLimit = 1 << 20
)
View Source
const MessageHeaderBinarySize = 8
View Source
const SegmentHeaderBinarySize = 64

Variables

View Source
var (
	ErrChecksumMismatch   = errors.New("channel corrupted: checksum mismatch")
	ErrChannelClosed      = errors.New("channel closed")
	ErrNotEnoughMessages  = errors.New("not enough messages")
	ErrNotEnoughReadToAck = errors.New("not enough read to ack")
)
View Source
var (
	DefaultFlushInterval   = 100 * time.Microsecond // 100us
	DefaultRotateThreshold = uint64(512 << 20)      // 512 MB
)

Functions

func ReadNext

func ReadNext(r io.Reader, w io.Writer, hBuf []byte) error

ReadNext reads the next message from the reader. If any error occurs, it returns immediately. If the message isn't fully read, it returns io.ErrUnexpectedEOF. If the checksum doesn't match, it returns ErrChecksumMismatch. It will return io.EOF if and only if no bytes were read.

Types

type CompressedSegmentHeader

type CompressedSegmentHeader struct {
	SegmentID         uint32
	BeginOffset       uint64
	EndOffset         uint64
	CompressionMethod CompressionMethod
}

func (*CompressedSegmentHeader) Decode

func (h *CompressedSegmentHeader) Decode(b []byte)

func (*CompressedSegmentHeader) Encode

func (h *CompressedSegmentHeader) Encode(b []byte)

type CompressionMethod

type CompressionMethod byte
const (
	Snappy CompressionMethod = iota
	Gzip
)

type FileChannel

type FileChannel struct {
	// contains filtered or unexported fields
}

func NewFileChannel

func NewFileChannel(dir string, opts ...Option) *FileChannel

func OpenFileChannel

func OpenFileChannel(dir string, opts ...Option) (*FileChannel, error)

func (*FileChannel) Close

func (fc *FileChannel) Close() error

func (*FileChannel) DiskUsage added in v0.1.1

func (fc *FileChannel) DiskUsage() (uint64, error)

func (*FileChannel) Flush

func (fc *FileChannel) Flush() error

func (*FileChannel) FlushOffset added in v0.1.1

func (fc *FileChannel) FlushOffset() uint64

func (*FileChannel) Iterator

func (fc *FileChannel) Iterator() *Iterator

func (*FileChannel) IteratorAcknowledgable

func (fc *FileChannel) IteratorAcknowledgable() *Iterator

func (*FileChannel) Open

func (fc *FileChannel) Open() error

func (*FileChannel) Write

func (fc *FileChannel) Write(p []byte) (err error)

func (*FileChannel) WriteOffset added in v0.1.1

func (fc *FileChannel) WriteOffset() uint64

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

func NewIterator

func NewIterator(manager *SegmentManager, position *Position, autoAck bool) *Iterator

func (*Iterator) Ack

func (it *Iterator) Ack(n int) error

func (*Iterator) Close

func (it *Iterator) Close() error

func (*Iterator) Next

func (it *Iterator) Next(ctx context.Context) (b []byte, err error)

func (*Iterator) Offset added in v0.1.1

func (it *Iterator) Offset() uint64

func (*Iterator) TryNext

func (it *Iterator) TryNext() ([]byte, error)

type MessageHeader

type MessageHeader struct {
	Length   uint32
	Checksum uint32
}

func (*MessageHeader) Decode

func (h *MessageHeader) Decode(b []byte)

func (*MessageHeader) Encode

func (h *MessageHeader) Encode(b []byte)

type Option

type Option func(*FileChannel)

func FlushInterval

func FlushInterval(d time.Duration) Option

func RotateThreshold

func RotateThreshold(n uint64) Option

func WithCompressionMethod added in v0.1.6

func WithCompressionMethod(method CompressionMethod) Option

WithCompressionMethod sets the compression method for the file channel.

type PlainSegmentHeader

type PlainSegmentHeader struct {
	SegmentID   uint32
	BeginOffset uint64
}

func (*PlainSegmentHeader) Decode

func (h *PlainSegmentHeader) Decode(b []byte)

func (*PlainSegmentHeader) Encode

func (h *PlainSegmentHeader) Encode(b []byte)

type Position

type Position struct {
	// contains filtered or unexported fields
}

func NewPosition

func NewPosition(offset uint64) *Position

func (*Position) Close

func (p *Position) Close()

func (*Position) Get

func (p *Position) Get() uint64

func (*Position) Update

func (p *Position) Update(offset uint64) uint64

func (*Position) Wait

func (p *Position) Wait(ctx context.Context, cond func(uint64) bool) (uint64, error)

type SegmentFileState

type SegmentFileState int
const (
	Plain SegmentFileState = iota
	Compressing
	Compressed
)

func ParseSegmentIndexAndState

func ParseSegmentIndexAndState(file string) (uint32, SegmentFileState, error)

type SegmentManager

type SegmentManager struct {
	// contains filtered or unexported fields
}

func NewSegmentManager

func NewSegmentManager(dir string) *SegmentManager

func (*SegmentManager) AdvanceReader

func (sm *SegmentManager) AdvanceReader(prev uint32, delta uint32) (uint32, uint32)

func (*SegmentManager) CloseReader

func (sm *SegmentManager) CloseReader(cur uint32) uint32

func (*SegmentManager) CurrentSegmentIndex

func (sm *SegmentManager) CurrentSegmentIndex() uint32

func (*SegmentManager) CurrentSegmentWatermark

func (sm *SegmentManager) CurrentSegmentWatermark() uint32

func (*SegmentManager) GetBeginIndex

func (sm *SegmentManager) GetBeginIndex() uint32

func (*SegmentManager) IncSegmentIndex

func (sm *SegmentManager) IncSegmentIndex() uint32

func (*SegmentManager) NewReader

func (sm *SegmentManager) NewReader() uint32

func (*SegmentManager) Pin

func (sm *SegmentManager) Pin(index uint32) bool

func (*SegmentManager) SegmentFile

func (sm *SegmentManager) SegmentFile(index uint32, state SegmentFileState) string

func (*SegmentManager) SetBeginIndex

func (sm *SegmentManager) SetBeginIndex(index uint32)

func (*SegmentManager) Unpin

func (sm *SegmentManager) Unpin(index uint32)

func (*SegmentManager) WaitUntilWatermarkAbove

func (sm *SegmentManager) WaitUntilWatermarkAbove(ctx context.Context, index uint32) (uint32, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL