logbuffer

package
v0.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: MPL-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package logbuffer provides a structure and API for efficiently reading and writing logs that may be streamed to a server.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is a data structure for buffering logs with concurrent read/write access.

Callers can use easy APIs to write and read data and the storage and access is managed underneath. If a reader falls behind a writer significantly, then the next read may "jump" forward to catch up. There is no way to explicitly detect a jump currently.

Writer

The writer calls Write on the buffer as it gets log entries. Multiple writers are safe to use. The buffer will always successfully write all entries, though it may result in extra allocations.

buf.Write(entries...)

Reader

A reader calls buf.Reader to get a Reader structure, and then calls Read to read values from the buffer. The Reader structure is used to maintain per-reader cursors so that multiple readers can exist at multiple points.

Internal Details

A buffer is structured as a sliding window over a set of "chunks". A chunk is a set of log entries. As you write into the buffer, the buffer will append to the current chunk until it is full, then move to the next chunk. When all the chunks are full, the buffer will allocate a new set of chunks.

The break into "chunks" is done for two reasons. The first is to prevent overallocation; we don't need to allocate a lot of buffer space, only enough for the current chunk. Second, to avoid lock contention. Once a chunk is full, it will never be written to again so we never need to acquire a lock to read the data. This makes reading backlogs very fast.

func New

func New() *Buffer

New creates a new Buffer.

func (*Buffer) Close

func (b *Buffer) Close() error

Close closes this log buffer. This will immediately close all active readers and further writes will do nothing.

func (*Buffer) Reader

func (b *Buffer) Reader(maxHistory int32) *Reader

Reader returns a shared reader for this buffer. The Reader provides an easy-to-use API to read log entries.

maxHistory limits the number of elements in the backlog. maxHistory of zero will move the cursur to the latest entry. maxHistory less than zero will not limit history at all and the full backlog will be available to read.

func (*Buffer) Write

func (b *Buffer) Write(entries ...Entry)

Write writes the set of entries into the buffer.

This is safe for concurrent access.

type Entry

type Entry interface{}

Entry is just an interface{} type. Buffer doesn't care what the entries are since it assumes they come in in some order and are read in that same order.

type MergeReader

type MergeReader interface {
	NextTimedEntry() (TimedEntry, error)
}

MergeReader is value that returns TimedEntry's for Merger to weave together.

type Merger

type Merger struct {
	// contains filtered or unexported fields
}

Merger can combine multiple log streams into one stream. It presumes each stream emits entries in time order, and then weaves the entries together to create a total time ordered stream.

func NewMerger

func NewMerger(readers ...MergeReader) *Merger

NewMerger creates a new Merger, with the stream generated from the given inputs.

func (*Merger) Read

func (l *Merger) Read(count int) ([]ReaderEntry, error)

ReadNext returns a slice of InputEntrys that are next in total time order. The result might be fewer than count values, depending on what is available.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads log entry values from a buffer.

Each Reader maintains its own read cursor. This allows multiple readers to exist across a Buffer at multiple points. Subsequent calls to Read may "jump" across time if a reader falls behind the writer.

It is not safe to call Read concurrently. If you want concurrent read access you can either create multiple readers or protect Read with a lock. You may call Close concurrently with Read.

func (*Reader) Close

func (r *Reader) Close() error

Close closes the reader. This will cause all future Read calls to return immediately with a nil result. This will also immediately unblock any currently blocked Reads.

This is safe to call concurrently with Read.

func (*Reader) CloseContext

func (r *Reader) CloseContext(ctx context.Context)

CloseContext will block until ctx is done and then close the reader. This can be called multiple times to register multiple context values to close the reader on.

func (*Reader) NextTimedEntry

func (r *Reader) NextTimedEntry() (TimedEntry, error)

Next returns the next entry. This is used by Merger to merge multiple readers results together, using the ingest time to provide an order on the values. This never blocks, returning io.EOF if there are no further values.

func (*Reader) Read

func (r *Reader) Read(max int, block bool) []Entry

Read returns a batch of log entries, up to "max" amount. If "max" isn't available, this will return any number that currently exists. If zero exist and block is true, this will block waiting for available entries. If block is false and no more log entries exist, this will return nil.

type ReaderEntry

type ReaderEntry struct {
	TimedEntry
	Reader MergeReader
}

ReaderEntry is returned by ReadNext. It provides access to the TimedEntry that is next as well as the input that generated the entry. This type is important because it allows the caller to figure out the context of the entry from the input. Because Merger is going to effectively shuffle the values that are put into it, the caller is going to have to deal with entries appearing in any order and the input provides critical context.

type TimedEntries

type TimedEntries []TimedEntry

TimedEntries is a convience type of TimedEntry's that provides the MergeReader interface.

func (*TimedEntries) NextTimedEntry

func (t *TimedEntries) NextTimedEntry() (TimedEntry, error)

Next returns the next value in the slice and then shrinks itself.

type TimedEntry

type TimedEntry interface {
	// Time is the associated time of the value. This is used to sort
	// the entry against other entries from other inputs.
	Time() time.Time

	// Value returns the log entry itself. This allows time addition
	// wrappers to return their original value.
	Value() interface{}
}

TimedEntry is the interface each input returns entries in

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL