commitlog

package
v0.0.0-...-4f2e7f3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2018 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package commitlog provides an implementation for a file-backed write-ahead log.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrIndexCorrupt = errors.New("corrupt index file")
)
View Source
var (
	ErrSegmentNotFound = errors.New("segment not found")
)

Functions

func Hash

func Hash(b []byte) uint64

func NewMessageSetFromProto

func NewMessageSetFromProto(baseOffset, basePos int64, msgs []*proto.Message) (
	MessageSet, []Entry, error)

Types

type Cleaner

type Cleaner interface {
	Clean([]*Segment) ([]*Segment, error)
}

type CommitLog

type CommitLog struct {
	Options
	// contains filtered or unexported fields
}

CommitLog implements the server.CommitLog interface, which is a durable write-ahead log.

func New

func New(opts Options) (*CommitLog, error)

New creates a new CommitLog and starts a background goroutine which periodically checkpoints the high watermark to disk.

func (*CommitLog) Append

func (l *CommitLog) Append(msgs []*proto.Message) ([]int64, error)

Append writes the given batch of messages to the log and returns their corresponding offsets in the log.

func (*CommitLog) AppendMessageSet

func (l *CommitLog) AppendMessageSet(ms []byte) ([]int64, error)

AppendMessageSet writes the given message set data to the log and returns the corresponding offsets in the log.

func (*CommitLog) Close

func (l *CommitLog) Close() error

Close closes each log segment file and stops the background goroutine checkpointing the high watermark to disk.

func (*CommitLog) Delete

func (l *CommitLog) Delete() error

Delete closes the log and removes all data associated with it from the filesystem.

func (*CommitLog) HighWatermark

func (l *CommitLog) HighWatermark() int64

HighWatermark returns the high watermark for the log.

func (*CommitLog) NewReaderCommitted

func (l *CommitLog) NewReaderCommitted(ctx context.Context, offset int64) (io.Reader, error)

NewReaderCommitted returns an io.Reader which reads only committed data from the log starting at the given offset.

func (*CommitLog) NewReaderUncommitted

func (l *CommitLog) NewReaderUncommitted(ctx context.Context, offset int64) (io.Reader, error)

NewReaderUncommitted returns an io.Reader which reads data from the log starting at the given offset.

func (*CommitLog) NewestOffset

func (l *CommitLog) NewestOffset() int64

NewestOffset returns the offset of the last message in the log.

func (*CommitLog) OldestOffset

func (l *CommitLog) OldestOffset() int64

OldestOffset returns the offset of the first message in the log.

func (*CommitLog) Segments

func (l *CommitLog) Segments() []*Segment

func (*CommitLog) SetHighWatermark

func (l *CommitLog) SetHighWatermark(hw int64)

SetHighWatermark sets the high watermark on the log. All messages up to and including the high watermark are considered committed.

func (*CommitLog) Truncate

func (l *CommitLog) Truncate(offset int64) error

Truncate removes all messages from the log starting at the given offset.

type CommittedReader

type CommittedReader struct {
	// contains filtered or unexported fields
}

func (*CommittedReader) Read

func (r *CommittedReader) Read(p []byte) (n int, err error)

type CompactCleaner

type CompactCleaner struct {
	// contains filtered or unexported fields
}

func NewCompactCleaner

func NewCompactCleaner() *CompactCleaner

func (*CompactCleaner) Clean

func (c *CompactCleaner) Clean(segments []*Segment) (cleaned []*Segment, err error)

type DeleteCleaner

type DeleteCleaner struct {
	Retention struct {
		Bytes int64
	}
}

func NewDeleteCleaner

func NewDeleteCleaner(bytes int64) *DeleteCleaner

func (*DeleteCleaner) Clean

func (c *DeleteCleaner) Clean(segments []*Segment) ([]*Segment, error)

type Entry

type Entry struct {
	Offset   int64
	Position int64
	Size     int32
}

func EntriesForMessageSet

func EntriesForMessageSet(baseOffset, basePos int64, ms []byte) []Entry

type Index

type Index struct {
	// contains filtered or unexported fields
}

func NewIndex

func NewIndex(opts options) (idx *Index, err error)

func (*Index) Close

func (idx *Index) Close() (err error)

func (*Index) InitializePosition

func (idx *Index) InitializePosition() (*Entry, error)

func (*Index) Name

func (idx *Index) Name() string

func (*Index) ReadAt

func (idx *Index) ReadAt(p []byte, offset int64) (n int, err error)

func (*Index) ReadEntryAtFileOffset

func (idx *Index) ReadEntryAtFileOffset(e *Entry, fileOffset int64) (err error)

ReadEntryAtFileOffset is used to read an Index entry at the given byte offset of the Index file. ReadEntryAtLogOffset is generally more useful for higher level use.

func (*Index) ReadEntryAtLogOffset

func (idx *Index) ReadEntryAtLogOffset(e *Entry, logOffset int64) error

ReadEntryAtLogOffset is used to read an Index entry at the given log offset of the Index file.

func (*Index) Sync

func (idx *Index) Sync() error

func (*Index) TruncateEntries

func (idx *Index) TruncateEntries(number int) error

func (*Index) WriteAt

func (idx *Index) WriteAt(p []byte, offset int64) (n int)

func (*Index) WriteEntries

func (idx *Index) WriteEntries(entries []Entry) (err error)

type IndexScanner

type IndexScanner struct {
	// contains filtered or unexported fields
}

func NewIndexScanner

func NewIndexScanner(idx *Index) *IndexScanner

func (*IndexScanner) Scan

func (s *IndexScanner) Scan() (*Entry, error)

type Message

type Message []byte

func ReadMessage

func ReadMessage(reader io.Reader, headersBuf []byte) (Message, int64, error)

func (Message) Attributes

func (m Message) Attributes() int8

func (Message) Crc

func (m Message) Crc() int32

func (Message) Headers

func (m Message) Headers() map[string][]byte

func (Message) Key

func (m Message) Key() []byte

func (Message) MagicByte

func (m Message) MagicByte() int8

func (Message) Timestamp

func (m Message) Timestamp() int64

func (Message) Value

func (m Message) Value() []byte

type MessageSet

type MessageSet []byte

func (MessageSet) Message

func (ms MessageSet) Message() Message

func (MessageSet) Offset

func (ms MessageSet) Offset() int64

func (MessageSet) Size

func (ms MessageSet) Size() int32

type Options

type Options struct {
	Path string
	// MaxSegmentBytes is the max number of bytes a segment can contain, once the limit is hit a
	// new segment will be split off.
	MaxSegmentBytes      int64
	MaxLogBytes          int64
	Compact              bool
	HWCheckpointInterval time.Duration
	Logger               logger.Logger
}

Options contains settings for configuring a CommitLog.

type Segment

type Segment struct {
	Index      *Index
	BaseOffset int64

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSegment

func NewSegment(path string, baseOffset, maxBytes int64, args ...interface{}) (*Segment, error)

func (*Segment) Cleaner

func (s *Segment) Cleaner() (*Segment, error)

Cleaner creates a cleaner segment for this segment.

func (*Segment) Close

func (s *Segment) Close() error

func (*Segment) Delete

func (s *Segment) Delete() error

Delete closes the segment and then deletes its log and index files.

func (*Segment) IsFull

func (s *Segment) IsFull() bool

func (*Segment) NextOffset

func (s *Segment) NextOffset() int64

func (*Segment) Position

func (s *Segment) Position() int64

func (*Segment) ReadAt

func (s *Segment) ReadAt(p []byte, off int64) (n int, err error)

func (*Segment) Replace

func (s *Segment) Replace(old *Segment) (err error)

Replace replaces the given segment with the callee.

func (*Segment) SetupIndex

func (s *Segment) SetupIndex() (err error)

SetupIndex creates and initializes an Index. Initialization is: - Initialize Index position - Initialize Segment nextOffset

func (*Segment) Write

func (s *Segment) Write(p []byte, numMsgs int) (n int, err error)

Write writes a byte slice to the log at the current position. It increments the offset as well as sets the position to the new tail.

func (*Segment) WriteMessageSet

func (s *Segment) WriteMessageSet(ms []byte, entries []Entry) error

type SegmentScanner

type SegmentScanner struct {
	// contains filtered or unexported fields
}

func NewSegmentScanner

func NewSegmentScanner(segment *Segment) *SegmentScanner

func (*SegmentScanner) Scan

func (s *SegmentScanner) Scan() (ms MessageSet, err error)

Scan should be called repeatedly to iterate over the messages in the segment, it will return io.EOF when there are no more messages.

type UncommittedReader

type UncommittedReader struct {
	// contains filtered or unexported fields
}

func (*UncommittedReader) Read

func (r *UncommittedReader) Read(p []byte) (n int, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL