Documentation ¶
Overview ¶
Package wal provides a WAL (write-ahead logging) facility.
This package aims to provide the most-basic implementation of a write-ahead logger as possible. For additional functionality, please see the "wal/walutil" package.
When writing to a Logger, the []byte (herein referred to as a "chunk", or "data chunk") is written to a "segment". When a segment is full, or there is not enough room left for a write to fully succeed, the segment is passed to a Sink's WriteSegment method, and the []byte is written to a new, empty segment.
A Sink is a type that is capable of storing, and retrieving segments. This package provides a Sink implementation that persists segments to a local directory: DirectorySink. Sinks do most of the "heavy lifting" for this package.
Unlike most WAL implementations, the Logger type does not directly expose a means of persisting segments at a regular time interval. This is intentional, and was separated out to keep the implementation of Logger as simple as possible. If you wish to have segments written at a specific time interval, see the documentation for the wal/walutil.FlushInterval function.
This package also provides the means of replaying a log, without requiring the creation of a Logger. For more deatils, see the NewReader and NewReaderOffset functions.
Index ¶
- Constants
- Variables
- type Analyzer
- type DirectorySink
- func (ds *DirectorySink) Analyze() error
- func (ds *DirectorySink) Close() error
- func (ds *DirectorySink) LoadSegment(offset Offset) (*Segment, error)
- func (ds *DirectorySink) NumSegments() int
- func (ds *DirectorySink) Offsets() (oldest, newest Offset)
- func (ds *DirectorySink) Truncate(offset Offset) error
- func (ds *DirectorySink) WriteSegment(seg *Segment) error
- type Logger
- type MemorySink
- func (s *MemorySink) Analyze() error
- func (s *MemorySink) Close() error
- func (s *MemorySink) LoadSegment(offset Offset) (*Segment, error)
- func (s *MemorySink) NumSegments() int
- func (s *MemorySink) Offsets() (first, last Offset)
- func (s *MemorySink) Truncate(offset Offset) error
- func (s *MemorySink) WriteSegment(seg *Segment) error
- type Offset
- type Option
- type Reader
- type Segment
- func (s *Segment) Chunk() chunk
- func (s *Segment) Chunks() int
- func (s *Segment) CurrentReadOffset() Offset
- func (s *Segment) EncodedSize() (int64, error)
- func (s *Segment) Limits() (oldest, newest Offset)
- func (s *Segment) Next() bool
- func (s *Segment) ReadFrom(r io.Reader) (int64, error)
- func (s *Segment) Remaining() int64
- func (s *Segment) Size() int64
- func (s *Segment) Truncate(offset Offset)
- func (s *Segment) Write(p []byte) (int, error)
- func (s *Segment) WriteTo(w io.Writer) (int64, error)
- type SegmentLoader
- type SegmentWriter
- type Sink
Constants ¶
const ( // DefaultSegmentSize is the default size of a data segment (16MB). DefaultSegmentSize uint64 = 16777216 )
Variables ¶
var ( ErrTooBig = errors.New("wal: data too large for segment") ErrLoggerClosed = errors.New("wal: logger closed") )
var ( ErrNotEnoughSpace = errors.New("not enough space in segment") ErrSegmentFull = errors.New("segment full") )
var ZeroOffset = Offset(0)
ZeroOffset holds the value of the oldest-possible offset within a write-ahead logger.
Functions ¶
This section is empty.
Types ¶
type Analyzer ¶
type Analyzer interface {
Analyze() error
}
Analyzer defines the interface of a type that can perform analysis on a persistent storage medium for write-ahead logs.
type DirectorySink ¶
type DirectorySink struct {
// contains filtered or unexported fields
}
DirectorySink implements a Sink that can persist WAL segments to, and load them from, a directory.
The nomenclature of the on-disk WAL segment files is:
<chunkOffset0>-<chunkOffsetN>
where chunkOffsetN is the offset of the last data chunk in the segment. As an example, for a segment holding data chunks written between January 1 2017 00:00 and January 1 2017 01:00, the resulting segment's file name would be:
1483228800000000000-1483232400000000000
Each WAL segment file is accompanied by another file containing a checksum used for verifying the contents of the segment. The checksum file name, for the above segment, would be:
1483228800000000000-1483232400000000000.CHECKSUM
func NewDirectorySink ¶
func NewDirectorySink(dir string) (*DirectorySink, error)
NewDirectorySink returns a *DirectorySink that can read and write WAL segments to directory dir.
The permissions of dir will be checked to ensure the *DirectorySink can read and write to dir. If the directory does not exist, it will be created with mode 0777 (before umask).
func (*DirectorySink) Analyze ¶
func (ds *DirectorySink) Analyze() error
Analyze scans the directory the *DirectorySink was initialized with, and gathers all of the currently-available offsets.
This method also attempts to verify each found segment, by calculating a checksum of the segment file, and comparing it to the checksum in the segment's checksum file.
func (*DirectorySink) Close ¶
func (ds *DirectorySink) Close() error
Close implements the io.Closer interface.
In this particular Sink implementation, Close does nothing, as a DirectorySink does not hold any open file descriptors beyond those when calling WriteSegment, or LoadSegment.
func (*DirectorySink) LoadSegment ¶
func (ds *DirectorySink) LoadSegment(offset Offset) (*Segment, error)
LoadSegment implements the SegmentLoader interface.
func (*DirectorySink) NumSegments ¶
func (ds *DirectorySink) NumSegments() int
NumSegments implements the Sink interface by returning the number of data segments currently known to the sink.
func (*DirectorySink) Offsets ¶
func (ds *DirectorySink) Offsets() (oldest, newest Offset)
Offsets returns the oldest, and newest offsets known to the DirectorySink. Initially, the offsets would be gathered by calling the Sink's Analyze() method. After initialization, and analysis, the offset range is extended by each call to WriteSequence.
Offsets implements the Sink interface.
func (*DirectorySink) Truncate ¶
func (ds *DirectorySink) Truncate(offset Offset) error
Truncate implements the Sink interface.
Truncate will delete any on-disk segment files, along with their checksum files, if the last offset in the segment file is older than the given offset.
Should the offset fall within the offsets of a segment file, the segment file will be truncated, re-written to disk, and its checksum re-calculated.
func (*DirectorySink) WriteSegment ¶
func (ds *DirectorySink) WriteSegment(seg *Segment) error
WriteSegment implements the SegmentWriter interface.
It will write each data segment out to a file, along with a second file with a .CHECKSUM extension.
type Logger ¶
type Logger struct {
// contains filtered or unexported fields
}
Logger is a type that can be used for maintaining write-ahead logs.
A Logger always maintains an "active" segment that data will be written to. For more details, see the Write method's documentation.
func (*Logger) Close ¶
Close persists the current segment, by writing it to the *Logger's Sink, then subsequently closes the Sink.
Close implements the io.Closer interface.
func (*Logger) Flush ¶
Flush locks the *Logger for writing, and writes the currently-active data segment to the *Logger's internal Sink. If the segment was successfully written, a new, empty segment is started, and the *Logger will be unlocked.
Attempting to call Flush after Close will return ErrLoggerClosed.
func (*Logger) NewReader ¶
NewReader returns a new *Reader that can sequentially read chunks of data from the earliest-known offset.
func (*Logger) NewReaderOffset ¶
NewReaderOffset returns a new *Reader that can be used to sequentially read chunks of data, starting at offset.
func (*Logger) Offsets ¶
Latest returns the offsets of the first (oldest), and last (newest) data chunks.
func (*Logger) Truncate ¶
Truncate removes all data chunks whose offsets are <= offset.
This method attempts to call the underlying Sink's Truncate method, before truncating the current segment.
func (*Logger) Write ¶
Write implements the io.Writer interface for a *Logger.
When len(p) > the amount of space left in a segment, the current segment will be written to the *Logger's internal Sink, and a new segment will be started. Should len(p) be larger than the size of a new, empty segment, this method will return ErrTooBig.
Any attempt to write to a *Logger, after its Close method has been called, will yield ErrLoggerClosed.
type MemorySink ¶
type MemorySink struct {
// contains filtered or unexported fields
}
MemorySink is a Sink implementation that only stores data in memory.
func NewMemorySink ¶
func NewMemorySink() (*MemorySink, error)
NewMemorySink returns a Sink implementation that stores segments in memory.
func (*MemorySink) Analyze ¶
func (s *MemorySink) Analyze() error
func (*MemorySink) Close ¶
func (s *MemorySink) Close() error
func (*MemorySink) LoadSegment ¶
func (s *MemorySink) LoadSegment(offset Offset) (*Segment, error)
func (*MemorySink) NumSegments ¶
func (s *MemorySink) NumSegments() int
func (*MemorySink) Offsets ¶
func (s *MemorySink) Offsets() (first, last Offset)
func (*MemorySink) Truncate ¶
func (s *MemorySink) Truncate(offset Offset) error
func (*MemorySink) WriteSegment ¶
func (s *MemorySink) WriteSegment(seg *Segment) error
type Offset ¶
type Offset int64
Offset represents the offset of a data chunk within a write-ahead logger.
func NewOffset ¶
func NewOffset() Offset
NewOffset returns a new Offset for the current time. This is a shorthand for:
NewOffsetTime(time.Now())
func NewOffsetTime ¶
NewOffsetTime returns a new Offset for the given time.Time.
func ParseOffset ¶
ParseOffset returns an offset parsed from s.
type Option ¶
Option is a functional configuration type that can be used to configure the behaviour of a *Logger.
func SegmentSize ¶
SegmentSize sets the size of a data segment.
Depending on the Sink provided to the *Logger, setting n too low may cause excessive amounts of I/O, thus slowing everything down. Another potential problem is attempting to write data, where len(data) > n.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader loads data segments from a Sink, and progresses through a data segment until there are no more chunks to be read. When the end of a current segment is reached, a Reader will attempt to increment the last-known chunk offset by one, and load the next-available data segment.
It is not safe to call a Reader from multiple goroutines.
Example:
r := NewReader(sink) for r.Next() { fmt.Printf("% x\n", r.Data()) } if err := r.Error(); err != nil { log.Println("error:", err) }
func NewReader ¶
NewReader returns a *Reader that reads data chunks from sink, starting at the earliest-possible offset.
func NewReaderOffset ¶
NewReaderOffset returns a *Reader that starts reading data chunks from sink, at the specified offset.
func (*Reader) Data ¶
Data returns the []byte of the current data chunk. Successive calls to Data, without calling Next, will return the same []byte.
type Segment ¶
type Segment struct {
// contains filtered or unexported fields
}
Segment is a size-bounded type that chunks are written to.
While a segment is safe for concurrent reading and writing, it is not recommended to do so.
func NewSegment ¶
func NewSegment() *Segment
func NewSegmentSize ¶
func (*Segment) Chunk ¶
func (s *Segment) Chunk() chunk
Data returns the current chunk. Successive calls to Data will yield the same chunk. To advance to the next chunk in the segment, call the Next() method.
func (*Segment) CurrentReadOffset ¶
CurrentReadOffset returns the offset of the []byte that will be returned by Data.
CurrentReadOffset will panic if it is called before Next.
func (*Segment) EncodedSize ¶
EncodedSize returns the encoded size of the segment, in bytes. This is the number of bytes that should be returned by WriteTo, assuming no more chunks are added to the segment.
func (*Segment) Limits ¶
Limits returns the oldest and newest offsets of the data chunks in the segment.
If there are no data chunks in the segment, this method will return ZeroOffset for both offsets.
If there is only one data chunk in the segment, that chunk's offset is returned for both oldest, and newest.
func (*Segment) Next ¶
Next reports whether or not there is another chunk that can be read with the Chunk() method.
For example:
for s.Next() { c := s.Chunk() ... }
func (*Segment) ReadFrom ¶
ReadFrom implements the io.ReaderFrom interface, and is primarily used to load a segment from disk.
Calling ReadFrom on a non-empty segment will return a non-nil error.
func (*Segment) Remaining ¶
Remaining returns the number of bytes left before the segment is at capacity.
func (*Segment) Truncate ¶
Truncate removes all chunks from the segment, whose offsets are <= offset.
If the current segment is being read, the internal pointer of the chunk to read will be adjusted.
type SegmentLoader ¶
type SegmentLoader interface { // LoadSegment returns the WAL segment containing the given Offset. // // If ZeroOffset is specified, then the segment with the lowest // offset will be returned. // // Should the given offset be greater than one contained in any // available segments, no segment will be returned, and err will be // io.EOF. LoadSegment(Offset) (*Segment, error) }
SegmentLoader defines the interface of a type that can retrieve segments from their persistent storage medium.
type SegmentWriter ¶
SegmentWriter defines the interface of a type that is able to store WAL segments.
type Sink ¶
type Sink interface { Analyzer SegmentLoader SegmentWriter io.Closer // Offsets returns the first, and last (most-recent) offsets known // to a Sink. Offsets() (first Offset, last Offset) // NumSegments returns the number of segments currently known to // the sink. NumSegments() int // Truncate permanently deletes all data chunks prior to the given // offset. Truncate(Offset) error }
Sink defines the interface of a type that can persist, and subsequently load, write-ahead logging segments.