Documentation
¶
Index ¶
- Constants
- Variables
- func ReadNext(r io.Reader, w io.Writer, hBuf []byte) error
- type CompressedSegmentHeader
- type CompressionMethod
- type FileChannel
- func (fc *FileChannel) Close() error
- func (fc *FileChannel) DiskUsage() (uint64, error)
- func (fc *FileChannel) Flush() error
- func (fc *FileChannel) FlushOffset() uint64
- func (fc *FileChannel) Iterator() *Iterator
- func (fc *FileChannel) IteratorAcknowledgable() *Iterator
- func (fc *FileChannel) Open() error
- func (fc *FileChannel) Write(p []byte) (err error)
- func (fc *FileChannel) WriteOffset() uint64
- type Iterator
- type MessageHeader
- type Option
- type PlainSegmentHeader
- type Position
- type SegmentFileState
- type SegmentManager
- func (sm *SegmentManager) AdvanceReader(prev uint32, delta uint32) (uint32, uint32)
- func (sm *SegmentManager) CloseReader(cur uint32) uint32
- func (sm *SegmentManager) CurrentSegmentIndex() uint32
- func (sm *SegmentManager) CurrentSegmentWatermark() uint32
- func (sm *SegmentManager) GetBeginIndex() uint32
- func (sm *SegmentManager) IncSegmentIndex() uint32
- func (sm *SegmentManager) NewReader() uint32
- func (sm *SegmentManager) Pin(index uint32) bool
- func (sm *SegmentManager) SegmentFile(index uint32, state SegmentFileState) string
- func (sm *SegmentManager) SetBeginIndex(index uint32)
- func (sm *SegmentManager) Unpin(index uint32)
- func (sm *SegmentManager) WaitUntilWatermarkAbove(ctx context.Context, index uint32) (uint32, error)
Constants ¶
View Source
const (
IteratorBufferLimit = 1 << 20
)
View Source
const MessageHeaderBinarySize = 8
View Source
const SegmentHeaderBinarySize = 64
Variables ¶
View Source
var ( ErrChecksumMismatch = errors.New("channel corrupted: checksum mismatch") ErrChannelClosed = errors.New("channel closed") ErrNotEnoughMessages = errors.New("not enough messages") ErrNotEnoughReadToAck = errors.New("not enough read to ack") )
View Source
var ( DefaultFlushInterval = 100 * time.Microsecond // 100us DefaultRotateThreshold = uint64(512 << 20) // 512 MB )
Functions ¶
Types ¶
type CompressedSegmentHeader ¶
type CompressedSegmentHeader struct { SegmentID uint32 BeginOffset uint64 EndOffset uint64 CompressionMethod CompressionMethod }
func (*CompressedSegmentHeader) Decode ¶
func (h *CompressedSegmentHeader) Decode(b []byte)
func (*CompressedSegmentHeader) Encode ¶
func (h *CompressedSegmentHeader) Encode(b []byte)
type FileChannel ¶
type FileChannel struct {
// contains filtered or unexported fields
}
func NewFileChannel ¶
func NewFileChannel(dir string, opts ...Option) *FileChannel
func OpenFileChannel ¶
func OpenFileChannel(dir string, opts ...Option) (*FileChannel, error)
func (*FileChannel) Close ¶
func (fc *FileChannel) Close() error
func (*FileChannel) DiskUsage ¶ added in v0.1.1
func (fc *FileChannel) DiskUsage() (uint64, error)
func (*FileChannel) Flush ¶
func (fc *FileChannel) Flush() error
func (*FileChannel) FlushOffset ¶ added in v0.1.1
func (fc *FileChannel) FlushOffset() uint64
func (*FileChannel) Iterator ¶
func (fc *FileChannel) Iterator() *Iterator
func (*FileChannel) IteratorAcknowledgable ¶
func (fc *FileChannel) IteratorAcknowledgable() *Iterator
func (*FileChannel) Open ¶
func (fc *FileChannel) Open() error
func (*FileChannel) Write ¶
func (fc *FileChannel) Write(p []byte) (err error)
func (*FileChannel) WriteOffset ¶ added in v0.1.1
func (fc *FileChannel) WriteOffset() uint64
type Iterator ¶
type Iterator struct {
// contains filtered or unexported fields
}
func NewIterator ¶
func NewIterator(manager *SegmentManager, position *Position, autoAck bool) *Iterator
type MessageHeader ¶
func (*MessageHeader) Decode ¶
func (h *MessageHeader) Decode(b []byte)
func (*MessageHeader) Encode ¶
func (h *MessageHeader) Encode(b []byte)
type Option ¶
type Option func(*FileChannel)
func FlushInterval ¶
func RotateThreshold ¶
func WithCompressionMethod ¶ added in v0.1.6
func WithCompressionMethod(method CompressionMethod) Option
WithCompressionMethod sets the compression method for the file channel.
type PlainSegmentHeader ¶
func (*PlainSegmentHeader) Decode ¶
func (h *PlainSegmentHeader) Decode(b []byte)
func (*PlainSegmentHeader) Encode ¶
func (h *PlainSegmentHeader) Encode(b []byte)
type Position ¶
type Position struct {
// contains filtered or unexported fields
}
func NewPosition ¶
type SegmentFileState ¶
type SegmentFileState int
const ( Plain SegmentFileState = iota Compressing Compressed )
func ParseSegmentIndexAndState ¶
func ParseSegmentIndexAndState(file string) (uint32, SegmentFileState, error)
type SegmentManager ¶
type SegmentManager struct {
// contains filtered or unexported fields
}
func NewSegmentManager ¶
func NewSegmentManager(dir string) *SegmentManager
func (*SegmentManager) AdvanceReader ¶
func (sm *SegmentManager) AdvanceReader(prev uint32, delta uint32) (uint32, uint32)
func (*SegmentManager) CloseReader ¶
func (sm *SegmentManager) CloseReader(cur uint32) uint32
func (*SegmentManager) CurrentSegmentIndex ¶
func (sm *SegmentManager) CurrentSegmentIndex() uint32
func (*SegmentManager) CurrentSegmentWatermark ¶
func (sm *SegmentManager) CurrentSegmentWatermark() uint32
func (*SegmentManager) GetBeginIndex ¶
func (sm *SegmentManager) GetBeginIndex() uint32
func (*SegmentManager) IncSegmentIndex ¶
func (sm *SegmentManager) IncSegmentIndex() uint32
func (*SegmentManager) NewReader ¶
func (sm *SegmentManager) NewReader() uint32
func (*SegmentManager) Pin ¶
func (sm *SegmentManager) Pin(index uint32) bool
func (*SegmentManager) SegmentFile ¶
func (sm *SegmentManager) SegmentFile(index uint32, state SegmentFileState) string
func (*SegmentManager) SetBeginIndex ¶
func (sm *SegmentManager) SetBeginIndex(index uint32)
func (*SegmentManager) Unpin ¶
func (sm *SegmentManager) Unpin(index uint32)
func (*SegmentManager) WaitUntilWatermarkAbove ¶
Click to show internal directories.
Click to hide internal directories.