Documentation
¶
Overview ¶
Package commitlog provides an implementation for a file-backed write-ahead log.
Index ¶
- Variables
- func NewMessageSetFromProto(baseOffset, basePos int64, msgs []*proto.Message) (MessageSet, []*Entry, error)
- type CommitLog
- func (l *CommitLog) Append(msgs []*proto.Message) ([]int64, error)
- func (l *CommitLog) AppendMessageSet(ms []byte) ([]int64, error)
- func (l *CommitLog) Clean() error
- func (l *CommitLog) Close() error
- func (l *CommitLog) Delete() error
- func (l *CommitLog) HighWatermark() int64
- func (l *CommitLog) NewReader(offset int64, uncommitted bool) (*Reader, error)
- func (l *CommitLog) NewestOffset() int64
- func (l *CommitLog) OffsetForTimestamp(timestamp int64) (int64, error)
- func (l *CommitLog) OldestOffset() int64
- func (l *CommitLog) Segments() []*Segment
- func (l *CommitLog) SetHighWatermark(hw int64)
- func (l *CommitLog) Truncate(offset int64) error
- type CompactCleaner
- type CompactCleanerOptions
- type DeleteCleaner
- type DeleteCleanerOptions
- type Entry
- type Index
- func (idx *Index) Close() error
- func (idx *Index) CountEntries() int64
- func (idx *Index) InitializePosition() (*Entry, error)
- func (idx *Index) Name() string
- func (idx *Index) Position() int64
- func (idx *Index) ReadAt(p []byte, offset int64) (n int, err error)
- func (idx *Index) ReadEntryAtFileOffset(e *Entry, fileOffset int64) (err error)
- func (idx *Index) ReadEntryAtLogOffset(e *Entry, logOffset int64) error
- func (idx *Index) Shrink() error
- func (idx *Index) Sync() error
- func (idx *Index) TruncateEntries(number int) error
- type IndexScanner
- type Message
- type MessageSet
- type Options
- type Reader
- type Segment
- func (s *Segment) CheckSplit(logRollTime time.Duration) bool
- func (s *Segment) Cleaned() (*Segment, error)
- func (s *Segment) Close() error
- func (s *Segment) Delete() error
- func (s *Segment) FirstOffset() int64
- func (s *Segment) IsEmpty() bool
- func (s *Segment) LastOffset() int64
- func (s *Segment) MessageCount() int64
- func (s *Segment) NextOffset() int64
- func (s *Segment) Position() int64
- func (s *Segment) ReadAt(p []byte, off int64) (n int, err error)
- func (s *Segment) Replace(old *Segment) error
- func (s *Segment) Seal()
- func (s *Segment) Truncated() (*Segment, error)
- func (s *Segment) WriteMessageSet(ms []byte, entries []*Entry) error
- type SegmentScanner
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEntryNotFound is returned when a segment search cannot find a // specific entry. ErrEntryNotFound = errors.New("entry not found") // ErrSegmentClosed is returned on reads/writes to a closed segment. ErrSegmentClosed = errors.New("segment has been closed") // ErrSegmentExists is returned when attempting to create a segment that // already exists. ErrSegmentExists = errors.New("segment already exists") // ErrSegmentReplaced is returned when attempting to read from a segment // that has been replaced due to log compaction. When this error is // encountered, operations should be retried in order to run against the // new segment. ErrSegmentReplaced = errors.New("segment was replaced") )
var (
ErrIndexCorrupt = errors.New("corrupt index file")
)
var ErrSegmentNotFound = errors.New("segment not found")
Functions ¶
func NewMessageSetFromProto ¶
Types ¶
type CommitLog ¶
type CommitLog struct { Options // contains filtered or unexported fields }
CommitLog implements the server.CommitLog interface, which is a durable write-ahead log.
func New ¶
New creates a new CommitLog and starts a background goroutine which periodically checkpoints the high watermark to disk.
func (*CommitLog) Append ¶
Append writes the given batch of messages to the log and returns their corresponding offsets in the log.
func (*CommitLog) AppendMessageSet ¶
AppendMessageSet writes the given message set data to the log and returns the corresponding offsets in the log.
func (*CommitLog) Clean ¶
Clean applies retention and compaction rules against the log, if applicable.
func (*CommitLog) Close ¶
Close closes each log segment file and stops the background goroutine checkpointing the high watermark to disk.
func (*CommitLog) Delete ¶
Delete closes the log and removes all data associated with it from the filesystem.
func (*CommitLog) HighWatermark ¶
HighWatermark returns the high watermark for the log.
func (*CommitLog) NewReader ¶
NewReader creates a new Reader starting at the given offset. If uncommitted is true, the Reader will read uncommitted messages from the log. Otherwise, it will only return committed messages.
func (*CommitLog) NewestOffset ¶
NewestOffset returns the offset of the last message in the log or -1 if empty.
func (*CommitLog) OffsetForTimestamp ¶
OffsetForTimestamp returns the earliest offset whose timestamp is greater than or equal to the given timestamp.
func (*CommitLog) OldestOffset ¶
OldestOffset returns the offset of the first message in the log or -1 if empty.
func (*CommitLog) SetHighWatermark ¶
SetHighWatermark sets the high watermark on the log. All messages up to and including the high watermark are considered committed.
type CompactCleaner ¶
type CompactCleaner struct {
CompactCleanerOptions
}
CompactCleaner implements the compaction policy which replaces segments with compacted ones, i.e. retaining only the last message for a given key.
func NewCompactCleaner ¶
func NewCompactCleaner(opts CompactCleanerOptions) *CompactCleaner
NewCompactCleaner returns a new Cleaner which performs log compaction by rewriting segments such that they contain only the last message for a given key.
func (*CompactCleaner) Compact ¶
func (c *CompactCleaner) Compact(hw int64, segments []*Segment) ([]*Segment, error)
Compact performs log compaction by rewriting segments such that they contain only the last message for a given key. Compaction is applied to all segments up to but excluding the active (last) segment or the provided HW, whichever comes first.
type CompactCleanerOptions ¶
CompactCleanerOptions contains configuration settings for the CompactCleaner.
type DeleteCleaner ¶
type DeleteCleaner struct {
DeleteCleanerOptions
}
DeleteCleaner implements the delete cleanup policy which deletes old log segments based on the retention policy.
func NewDeleteCleaner ¶
func NewDeleteCleaner(opts DeleteCleanerOptions) *DeleteCleaner
NewDeleteCleaner returns a new Cleaner which enforces log retention policies by deleting segments.
type DeleteCleanerOptions ¶
type DeleteCleanerOptions struct { Retention struct { Bytes int64 Messages int64 Age time.Duration } Logger logger.Logger Name string }
DeleteCleanerOptions contains configuration settings for the DeleteCleaner.
type Entry ¶
func EntriesForMessageSet ¶
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
func (*Index) CountEntries ¶
func (*Index) InitializePosition ¶
func (*Index) Position ¶
Position returns the current position in the index to write to next. This value also represents the total length of the index.
func (*Index) ReadEntryAtFileOffset ¶
ReadEntryAtFileOffset is used to read an Index entry at the given byte offset of the Index file. ReadEntryAtLogOffset is generally more useful for higher level use.
func (*Index) ReadEntryAtLogOffset ¶
ReadEntryAtLogOffset is used to read an Index entry at the given log offset of the Index file.
func (*Index) TruncateEntries ¶
type IndexScanner ¶
type IndexScanner struct {
// contains filtered or unexported fields
}
func NewIndexScanner ¶
func NewIndexScanner(idx *Index) *IndexScanner
func (*IndexScanner) Scan ¶
func (s *IndexScanner) Scan() (*Entry, error)
type MessageSet ¶
type MessageSet []byte
func (MessageSet) Message ¶
func (ms MessageSet) Message() Message
func (MessageSet) Offset ¶
func (ms MessageSet) Offset() int64
func (MessageSet) Size ¶
func (ms MessageSet) Size() int32
func (MessageSet) Timestamp ¶
func (ms MessageSet) Timestamp() int64
type Options ¶
type Options struct { Path string // Path to log directory MaxSegmentBytes int64 // Max bytes a Segment can contain before creating a new one MaxLogBytes int64 // Retention by bytes MaxLogMessages int64 // Retention by messages MaxLogAge time.Duration // Retention by age Compact bool // Run compaction on log clean CompactMaxGoroutines int // Max number of goroutines to use in a log compaction CleanerInterval time.Duration // Frequency to enforce retention policy HWCheckpointInterval time.Duration // Frequency to checkpoint HW to disk LogRollTime time.Duration // Max time before a new log segment is rolled out. Logger logger.Logger }
Options contains settings for configuring a CommitLog.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader reads messages atomically from a CommitLog. Readers should not be used concurrently.
func (*Reader) ReadMessage ¶
ReadMessage reads a single message from the underlying CommitLog or blocks until one is available. It returns the Message in addition to its offset and timestamp. This may return uncommitted messages if the Reader was created with the uncommitted flag set to true.
ReadMessage should not be called concurrently, and the headersBuf slice should have a capacity of at least 20.
type Segment ¶
type Segment struct { Index *Index BaseOffset int64 sync.RWMutex // contains filtered or unexported fields }
func NewSegment ¶
func (*Segment) CheckSplit ¶
CheckSplit determines if a new log segment should be rolled out either because this segment is full or LogRollTime has passed since the first message was written to the segment.
func (*Segment) Close ¶
Close a segment such that it can no longer be read from or written to. This operation is idempotent.
func (*Segment) FirstOffset ¶
func (*Segment) LastOffset ¶
func (*Segment) MessageCount ¶
func (*Segment) NextOffset ¶
func (*Segment) Seal ¶
func (s *Segment) Seal()
Seal a segment from being written to. This is called on the former active segment after a new segment is rolled. This is a no-op if the segment is already sealed.
type SegmentScanner ¶
type SegmentScanner struct {
// contains filtered or unexported fields
}
func NewSegmentScanner ¶
func NewSegmentScanner(segment *Segment) *SegmentScanner
func (*SegmentScanner) Scan ¶
func (s *SegmentScanner) Scan() (MessageSet, *Entry, error)
Scan should be called repeatedly to iterate over the messages in the segment, it will return io.EOF when there are no more messages.