Documentation ¶
Index ¶
- Constants
- Variables
- func Filename(database, table, schema, epoch string) string
- func HasSampleMetadata(b []byte) bool
- func IsSegment(path string) bool
- func NewSegmentMerger(paths ...string) (io.ReadCloser, error)
- func ParseFilename(path string) (database, table, schema, epoch string, err error)
- func WithSkipHeader(r *SegmentReader)
- type File
- type Index
- func (i *Index) Add(s SegmentInfo)
- func (i *Index) Get(prefix string) []SegmentInfo
- func (i *Index) LargestCountPrefix() string
- func (i *Index) LargestSizePrefix() string
- func (i *Index) OldestPrefix() string
- func (i *Index) PrefixesByAge() []string
- func (i *Index) PrefixesByCount() []string
- func (i *Index) PrefixesBySize() []string
- func (i *Index) Remove(s SegmentInfo)
- func (i *Index) TotalPrefixes() int
- func (i *Index) TotalSegments() int
- func (i *Index) TotalSize() int64
- type Iterator
- type Option
- type Repository
- func (s *Repository) Close() error
- func (s *Repository) Count() int
- func (s *Repository) Get(ctx context.Context, key []byte) (*WAL, error)
- func (s *Repository) Index() *Index
- func (s *Repository) Keys() [][]byte
- func (s *Repository) Open(ctx context.Context) error
- func (s *Repository) PrefixesByAge() []string
- func (s *Repository) Remove(key []byte) error
- func (s *Repository) RemoveSegment(si SegmentInfo)
- type RepositoryOpts
- type SampleType
- type Segment
- type SegmentInfo
- type SegmentMerger
- type SegmentOption
- type SegmentReader
- type WAL
- func (w *WAL) Append(ctx context.Context, buf []byte) error
- func (w *WAL) Close() error
- func (w *WAL) Flush() error
- func (w *WAL) Open(ctx context.Context) error
- func (w *WAL) Path() string
- func (w *WAL) Remove(path string) error
- func (w *WAL) RemoveAll() error
- func (w *WAL) Segment() Segment
- func (w *WAL) Size() int
- func (w *WAL) Write(ctx context.Context, buf []byte, opts ...WriteOptions) error
- type WALOpts
- type WriteOptions
Constants ¶
const DefaultIOBufSize = 4 * 1024
DefaultIOBufSize is the default buffer size for bufio.Writer.
Variables ¶
var ( ErrNotWALSegment = errors.New("not a WAL segment") ErrInvalidWALSegment = errors.New("invalid WAL segment") )
Functions ¶
func HasSampleMetadata ¶
func NewSegmentMerger ¶
func NewSegmentMerger(paths ...string) (io.ReadCloser, error)
func ParseFilename ¶
func WithSkipHeader ¶
func WithSkipHeader(r *SegmentReader)
Types ¶
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
Index provides overview of all segments in a repository.
func (*Index) Get ¶
func (i *Index) Get(prefix string) []SegmentInfo
Get returns all segments for a given prefix.
func (*Index) LargestCountPrefix ¶
LargestCountPrefix returns the prefix of the segment with the largest total count.
func (*Index) LargestSizePrefix ¶
LargestSizePrefix returns the prefix of the segment with the largest total size.
func (*Index) OldestPrefix ¶
Oldest returns the prefix of the oldest segment.
func (*Index) PrefixesByAge ¶
PrefixesByAge returns all prefixes sorted by oldest to newest.
func (*Index) PrefixesByCount ¶
PrefixesByCount returns all prefixes sorted by total count least to greatest. If there is a tie, the prefix that is lexicographically first is returned.
func (*Index) PrefixesBySize ¶
PrefixesBySize returns all prefixes sorted by total size least to greatest.
func (*Index) Remove ¶
func (i *Index) Remove(s SegmentInfo)
Remove removes a segment from the index.
func (*Index) TotalPrefixes ¶
TotalPrefixes returns the total number of prefixes in the index.
func (*Index) TotalSegments ¶
TotalSegments returns the total number of segments in the index.
type Iterator ¶
type Iterator interface { Next() (bool, error) Value() []byte Close() error // Verify ensures the Iterator can iterate over a continuous series of blocks without error. Verify() (int, error) Metadata() (SampleType, uint32) }
func NewSegmentIterator ¶
func NewSegmentIterator(r io.ReadCloser) (Iterator, error)
type Option ¶
type Option func(*SegmentReader)
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
Repository is a collection of WALs.
func NewRepository ¶
func NewRepository(opts RepositoryOpts) *Repository
func (*Repository) Close ¶
func (s *Repository) Close() error
func (*Repository) Count ¶
func (s *Repository) Count() int
func (*Repository) Index ¶
func (s *Repository) Index() *Index
func (*Repository) Keys ¶
func (s *Repository) Keys() [][]byte
func (*Repository) PrefixesByAge ¶
func (s *Repository) PrefixesByAge() []string
func (*Repository) Remove ¶
func (s *Repository) Remove(key []byte) error
func (*Repository) RemoveSegment ¶
func (s *Repository) RemoveSegment(si SegmentInfo)
type RepositoryOpts ¶
type SampleType ¶
type SampleType uint16
const ( UnknownSampleType SampleType = iota MetricSampleType TraceSampleType LogSampleType )
func SampleMetadata ¶
func SampleMetadata(b []byte) (t SampleType, count uint32)
type Segment ¶
type Segment interface { Append(ctx context.Context, buf []byte) error Write(ctx context.Context, buf []byte, opts ...WriteOptions) error Bytes() ([]byte, error) Close() error ID() string Size() (int64, error) CreatedAt() time.Time Reader() (io.ReadCloser, error) Path() string Iterator() (Iterator, error) Info() (SegmentInfo, error) Flush() error // Repair truncates the last bytes in the segment if they are missing, corrupted or have extra data. This // repairs any corrupted segment that may not have fully flushed to disk safely. The segment is truncated // from the first block that is found to be corrupt. Repair() error }
func NewSegment ¶
func NewSegment(dir, prefix string, opts ...SegmentOption) (Segment, error)
type SegmentInfo ¶
type SegmentMerger ¶
type SegmentMerger struct {
// contains filtered or unexported fields
}
func (*SegmentMerger) Close ¶
func (m *SegmentMerger) Close() error
type SegmentOption ¶
type SegmentOption func(s *segment)
func WithFlushIntervale ¶
func WithFlushIntervale(d time.Duration) SegmentOption
type SegmentReader ¶
type SegmentReader struct {
// contains filtered or unexported fields
}
func NewSegmentReader ¶
func NewSegmentReader(path string, opts ...Option) (*SegmentReader, error)
func (*SegmentReader) Close ¶
func (s *SegmentReader) Close() error
func (*SegmentReader) SampleMetadata ¶
func (s *SegmentReader) SampleMetadata() (t SampleType, sampleCount uint32)
type WALOpts ¶
type WALOpts struct { StorageDir string // WAL segment prefix Prefix string // SegmentMaxSize is the max size of a segment in bytes before it will be rotated and compressed. SegmentMaxSize int64 // SegmentMaxAge is the max age of a segment before it will be rotated and compressed. SegmentMaxAge time.Duration // MaxDiskUsage is the max disk usage of WAL segments allowed before writes should be rejected. MaxDiskUsage int64 // MaxSegmentCount is the max number of segments allowed before writes should be rejected. MaxSegmentCount int // Index is the index of the WAL segments. Index *Index // WALFlushInterval is the interval at which the WAL should be flushed. WALFlushInterval time.Duration }
type WriteOptions ¶
type WriteOptions func([]byte)
func WithSampleMetadata ¶
func WithSampleMetadata(t SampleType, count uint32) WriteOptions