commitlog

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2019 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DeleteCleanupPolicy  = "delete"
	CompactCleanupPolicy = "compact"

	LogFileSuffix   = ".log"
	IndexFileSuffix = ".index"
)

Variables

View Source
var (
	ErrSegmentNotFound = errors.New("segment not found")
	Encoding           = binary.BigEndian
)
View Source
var (
	ErrIndexCorrupt = errors.New("corrupt index file")
)

Functions

func Hash

func Hash(b []byte) uint64

Types

type Cleaner

type Cleaner interface {
	Clean([]*Segment) ([]*Segment, error)
}

type CleanupPolicy

type CleanupPolicy string

type CommitLog

type CommitLog struct {
	Options
	// contains filtered or unexported fields
}

func New

func New(opts Options) (*CommitLog, error)

func (*CommitLog) Append

func (l *CommitLog) Append(b []byte) (offset int64, err error)

func (*CommitLog) Close

func (l *CommitLog) Close() error

func (*CommitLog) Delete

func (l *CommitLog) Delete() error

func (*CommitLog) NewReader

func (l *CommitLog) NewReader(offset int64, maxBytes int32) (io.Reader, error)

func (*CommitLog) NewestOffset

func (l *CommitLog) NewestOffset() int64

func (*CommitLog) OldestOffset

func (l *CommitLog) OldestOffset() int64

func (*CommitLog) Read

func (l *CommitLog) Read(p []byte) (n int, err error)

func (*CommitLog) Segments

func (l *CommitLog) Segments() []*Segment

func (*CommitLog) Truncate

func (l *CommitLog) Truncate(offset int64) error

type CompactCleaner

type CompactCleaner struct {
	// contains filtered or unexported fields
}

func NewCompactCleaner

func NewCompactCleaner() *CompactCleaner

func (*CompactCleaner) Clean

func (c *CompactCleaner) Clean(segments []*Segment) (cleaned []*Segment, err error)

type DeleteCleaner

type DeleteCleaner struct {
	Retention struct {
		Bytes int64
	}
}

func NewDeleteCleaner

func NewDeleteCleaner(bytes int64) *DeleteCleaner

func (*DeleteCleaner) Clean

func (c *DeleteCleaner) Clean(segments []*Segment) ([]*Segment, error)

type Entry

type Entry struct {
	Offset   int64
	Position int64
}

type Index

type Index struct {
	// contains filtered or unexported fields
}

func NewIndex

func NewIndex(opts options) (idx *Index, err error)

func (*Index) Close

func (idx *Index) Close() (err error)

func (*Index) Name

func (idx *Index) Name() string

func (*Index) ReadAt

func (idx *Index) ReadAt(p []byte, offset int64) (n int, err error)

func (*Index) ReadEntryAtFileOffset

func (idx *Index) ReadEntryAtFileOffset(e *Entry, fileOffset int64) (err error)

ReadEntryAtFileOffset is used to read an Index entry at the given byte offset of the Index file. ReadEntryAtLogOffset is generally more useful for higher level use.

func (*Index) ReadEntryAtLogOffset

func (idx *Index) ReadEntryAtLogOffset(e *Entry, logOffset int64) error

ReadEntryAtLogOffset is used to read an Index entry at the given log offset of the Index file.

func (*Index) SanityCheck

func (idx *Index) SanityCheck() error

func (*Index) Sync

func (idx *Index) Sync() error

func (*Index) TruncateEntries

func (idx *Index) TruncateEntries(number int) error

func (*Index) Write

func (idx *Index) Write(p []byte) (n int, err error)

func (*Index) WriteAt

func (idx *Index) WriteAt(p []byte, offset int64) (n int)

func (*Index) WriteEntry

func (idx *Index) WriteEntry(entry Entry) (err error)

type IndexScanner

type IndexScanner struct {
	// contains filtered or unexported fields
}

func NewIndexScanner

func NewIndexScanner(idx *Index) *IndexScanner

func (*IndexScanner) Scan

func (s *IndexScanner) Scan() (*Entry, error)

type Message

type Message []byte

func NewMessage

func NewMessage(p []byte) Message

func (Message) Attributes

func (m Message) Attributes() int8

func (Message) Crc

func (m Message) Crc() int32

func (Message) Key

func (m Message) Key() []byte

func (Message) MagicByte

func (m Message) MagicByte() int8

func (Message) Size

func (m Message) Size() int32

func (Message) Timestamp

func (m Message) Timestamp() int64

func (Message) Value

func (m Message) Value() []byte

type MessageSet

type MessageSet []byte

func NewMessageSet

func NewMessageSet(offset uint64, msgs ...Message) MessageSet

func (MessageSet) Messages

func (ms MessageSet) Messages() (msgs []Message)

func (MessageSet) Offset

func (ms MessageSet) Offset() int64

func (MessageSet) Payload

func (ms MessageSet) Payload() []byte

func (MessageSet) PutOffset

func (ms MessageSet) PutOffset(offset int64)

func (MessageSet) Size

func (ms MessageSet) Size() int32

type Options

type Options struct {
	Path string
	// MaxSegmentBytes is the max number of bytes a segment can contain, once the limit is hit a
	// new segment will be split off.
	MaxSegmentBytes int64
	MaxLogBytes     int64
	CleanupPolicy   CleanupPolicy
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

type Segment

type Segment struct {
	Index      *Index
	BaseOffset int64
	NextOffset int64
	Position   int64

	sync.Mutex
	// contains filtered or unexported fields
}

func NewSegment

func NewSegment(path string, baseOffset, maxBytes int64, args ...interface{}) (*Segment, error)

func (*Segment) BuildIndex

func (s *Segment) BuildIndex() (err error)

func (*Segment) Cleaner

func (s *Segment) Cleaner() (*Segment, error)

Cleaner creates a cleaner segment for this segment.

func (*Segment) Close

func (s *Segment) Close() error

func (*Segment) Delete

func (s *Segment) Delete() error

Delete closes the segment and then deletes its log and index files.

func (*Segment) IsFull

func (s *Segment) IsFull() bool

func (*Segment) Read

func (s *Segment) Read(p []byte) (n int, err error)

func (*Segment) ReadAt

func (s *Segment) ReadAt(p []byte, off int64) (n int, err error)

func (*Segment) Replace

func (s *Segment) Replace(old *Segment) (err error)

Replace replaces the given segment with the callee.

func (*Segment) SetupIndex

func (s *Segment) SetupIndex() (err error)

SetupIndex creates and initializes an Index. Initialization is: - Sanity check of the loaded Index - Truncates the Index (clears it) - Reads the log file from the beginning and re-initializes the Index

func (*Segment) Write

func (s *Segment) Write(p []byte) (n int, err error)

Write writes a byte slice to the log at the current position. It increments the offset as well as sets the position to the new tail.

type SegmentScanner

type SegmentScanner struct {
	// contains filtered or unexported fields
}

func NewSegmentScanner

func NewSegmentScanner(segment *Segment) *SegmentScanner

func (*SegmentScanner) Scan

func (s *SegmentScanner) Scan() (ms MessageSet, err error)

Scan should be called repeatedly to iterate over the messages in the segment, it will return io.EOF when there are no more messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL