wal

package
v0.0.0-...-8a5471c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2024 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultIOBufSize = 4 * 1024

DefaultIOBufSize is the default buffer size for bufio.Writer.

Variables

View Source
var (
	ErrNotWALSegment     = errors.New("not a WAL segment")
	ErrInvalidWALSegment = errors.New("invalid WAL segment")
)
View Source
var (
	ErrMaxDiskUsageExceeded = fmt.Errorf("max disk usage exceeded")
	ErrMaxSegmentsExceeded  = fmt.Errorf("max segments exceeded")
	ErrSegmentClosed        = fmt.Errorf("segment closed")
	ErrSegmentLocked        = fmt.Errorf("segment locked")
)

Functions

func Filename

func Filename(database, table, schema, epoch string) string

func HasSampleMetadata

func HasSampleMetadata(b []byte) bool

func IsSegment

func IsSegment(path string) bool

IsSegment returns true if the file is a valid segment file.

func NewSegmentMerger

func NewSegmentMerger(paths ...string) (io.ReadCloser, error)

func ParseFilename

func ParseFilename(path string) (database, table, schema, epoch string, err error)

func WithSkipHeader

func WithSkipHeader(r *SegmentReader)

Types

type File

type File struct {
	Path     string
	Database string
	Table    string
	Schema   string
	Epoch    string
	Key      string
}

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index provides overview of all segments in a repository.

func NewIndex

func NewIndex() *Index

NewIndex returns a new index.

func (*Index) Add

func (i *Index) Add(s SegmentInfo)

Add adds a segment to the index.

func (*Index) Get

func (i *Index) Get(prefix string) []SegmentInfo

Get returns all segments for a given prefix.

func (*Index) LargestCountPrefix

func (i *Index) LargestCountPrefix() string

LargestCountPrefix returns the prefix of the segment with the largest total count.

func (*Index) LargestSizePrefix

func (i *Index) LargestSizePrefix() string

LargestSizePrefix returns the prefix of the segment with the largest total size.

func (*Index) OldestPrefix

func (i *Index) OldestPrefix() string

Oldest returns the prefix of the oldest segment.

func (*Index) PrefixesByAge

func (i *Index) PrefixesByAge() []string

PrefixesByAge returns all prefixes sorted by oldest to newest.

func (*Index) PrefixesByCount

func (i *Index) PrefixesByCount() []string

PrefixesByCount returns all prefixes sorted by total count least to greatest. If there is a tie, the prefix that is lexicographically first is returned.

func (*Index) PrefixesBySize

func (i *Index) PrefixesBySize() []string

PrefixesBySize returns all prefixes sorted by total size least to greatest.

func (*Index) Remove

func (i *Index) Remove(s SegmentInfo)

Remove removes a segment from the index.

func (*Index) TotalPrefixes

func (i *Index) TotalPrefixes() int

TotalPrefixes returns the total number of prefixes in the index.

func (*Index) TotalSegments

func (i *Index) TotalSegments() int

TotalSegments returns the total number of segments in the index.

func (*Index) TotalSize

func (i *Index) TotalSize() int64

type Iterator

type Iterator interface {
	Next() (bool, error)
	Value() []byte
	Close() error
	// Verify ensures the Iterator can iterate over a continuous series of blocks without error.
	Verify() (int, error)
	Metadata() (SampleType, uint32)
}

func NewSegmentIterator

func NewSegmentIterator(r io.ReadCloser) (Iterator, error)

type Option

type Option func(*SegmentReader)

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository is a collection of WALs.

func NewRepository

func NewRepository(opts RepositoryOpts) *Repository

func (*Repository) Close

func (s *Repository) Close() error

func (*Repository) Count

func (s *Repository) Count() int

func (*Repository) Get

func (s *Repository) Get(ctx context.Context, key []byte) (*WAL, error)

func (*Repository) Index

func (s *Repository) Index() *Index

func (*Repository) Keys

func (s *Repository) Keys() [][]byte

func (*Repository) Open

func (s *Repository) Open(ctx context.Context) error

func (*Repository) PrefixesByAge

func (s *Repository) PrefixesByAge() []string

func (*Repository) Remove

func (s *Repository) Remove(key []byte) error

func (*Repository) RemoveSegment

func (s *Repository) RemoveSegment(si SegmentInfo)

type RepositoryOpts

type RepositoryOpts struct {
	StorageDir     string
	SegmentMaxSize int64
	SegmentMaxAge  time.Duration

	MaxDiskUsage     int64
	MaxSegmentCount  int
	WALFlushInterval time.Duration
}

type SampleType

type SampleType uint16
const (
	UnknownSampleType SampleType = iota
	MetricSampleType
	TraceSampleType
	LogSampleType
)

func SampleMetadata

func SampleMetadata(b []byte) (t SampleType, count uint32)

type Segment

type Segment interface {
	Append(ctx context.Context, buf []byte) error
	Write(ctx context.Context, buf []byte, opts ...WriteOptions) error
	Bytes() ([]byte, error)
	Close() error
	ID() string
	Size() (int64, error)
	CreatedAt() time.Time
	Reader() (io.ReadCloser, error)
	Path() string

	Iterator() (Iterator, error)
	Info() (SegmentInfo, error)
	Flush() error
	// Repair truncates the last bytes in the segment if they are missing, corrupted or have extra data.  This
	// repairs any corrupted segment that may not have fully flushed to disk safely.  The segment is truncated
	// from the first block that is found to be corrupt.
	Repair() error
}

func NewSegment

func NewSegment(dir, prefix string, opts ...SegmentOption) (Segment, error)

func Open

func Open(path string) (Segment, error)

type SegmentInfo

type SegmentInfo struct {
	Prefix    string
	Ulid      string
	Path      string
	Size      int64
	CreatedAt time.Time
}

type SegmentMerger

type SegmentMerger struct {
	// contains filtered or unexported fields
}

func (*SegmentMerger) Close

func (m *SegmentMerger) Close() error

func (*SegmentMerger) Read

func (m *SegmentMerger) Read(p []byte) (n int, err error)

type SegmentOption

type SegmentOption func(s *segment)

func WithFlushIntervale

func WithFlushIntervale(d time.Duration) SegmentOption

type SegmentReader

type SegmentReader struct {
	// contains filtered or unexported fields
}

func NewSegmentReader

func NewSegmentReader(path string, opts ...Option) (*SegmentReader, error)

func (*SegmentReader) Close

func (s *SegmentReader) Close() error

func (*SegmentReader) Read

func (s *SegmentReader) Read(p []byte) (n int, err error)

func (*SegmentReader) SampleMetadata

func (s *SegmentReader) SampleMetadata() (t SampleType, sampleCount uint32)

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

func NewWAL

func NewWAL(opts WALOpts) (*WAL, error)

func (*WAL) Append

func (w *WAL) Append(ctx context.Context, buf []byte) error

func (*WAL) Close

func (w *WAL) Close() error

func (*WAL) Flush

func (w *WAL) Flush() error

func (*WAL) Open

func (w *WAL) Open(ctx context.Context) error

func (*WAL) Path

func (w *WAL) Path() string

Path returns the path of the active segment.

func (*WAL) Remove

func (w *WAL) Remove(path string) error

func (*WAL) RemoveAll

func (w *WAL) RemoveAll() error

func (*WAL) Segment

func (w *WAL) Segment() Segment

func (*WAL) Size

func (w *WAL) Size() int

func (*WAL) Write

func (w *WAL) Write(ctx context.Context, buf []byte, opts ...WriteOptions) error

type WALOpts

type WALOpts struct {
	StorageDir string

	// WAL segment prefix
	Prefix string

	// SegmentMaxSize is the max size of a segment in bytes before it will be rotated and compressed.
	SegmentMaxSize int64

	// SegmentMaxAge is the max age of a segment before it will be rotated and compressed.
	SegmentMaxAge time.Duration

	// MaxDiskUsage is the max disk usage of WAL segments allowed before writes should be rejected.
	MaxDiskUsage int64

	// MaxSegmentCount is the max number of segments allowed before writes should be rejected.
	MaxSegmentCount int

	// Index is the index of the WAL segments.
	Index *Index

	// WALFlushInterval is the interval at which the WAL should be flushed.
	WALFlushInterval time.Duration
}

type WriteOptions

type WriteOptions func([]byte)

func WithSampleMetadata

func WithSampleMetadata(t SampleType, count uint32) WriteOptions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL