journal

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2024 License: MIT Imports: 17 Imported by: 1

Documentation

Overview

Package journal provides an abstraction of an append-only log with optimistic concurrency control.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IgnoreNotFound added in v0.9.3

func IgnoreNotFound(err error) error

IgnoreNotFound returns nil if err is a caused by RecordNotFoundError or ValueNotFoundError error. Otherwise it returns err unchanged.

func IsConflict added in v0.10.0

func IsConflict(err error) bool

IsConflict returns true if err is caused by ConflictError.

func IsEmpty added in v0.4.0

func IsEmpty[T any](ctx context.Context, j Journal[T]) (bool, error)

IsEmpty returns true if j does not currently contain any records.

func IsFresh added in v0.4.0

func IsFresh[T any](ctx context.Context, j Journal[T]) (bool, error)

IsFresh returns true if j has never contained any records.

func IsNotFound added in v0.10.0

func IsNotFound(err error) bool

IsNotFound returns true if err is is caused by RecordNotFoundError or ValueNotFoundError.

func RangeFromSearchResult added in v0.9.2

func RangeFromSearchResult[T any](
	ctx context.Context,
	j Journal[T],
	i Interval,
	cmp CompareFunc[T],
	fn RangeFunc[T],
) error

RangeFromSearchResult invokes fn for each record in the journal, in order, beginning with the record within the interval i for which cmp() returns zero.

It returns a ValueNotFoundError if there is no such record.

func RunBenchmarks added in v0.6.0

func RunBenchmarks(
	b *testing.B,
	store BinaryStore,
)

RunBenchmarks runs benchmarks against a BinaryStore implementation.

func RunTests

func RunTests(
	t *testing.T,
	store BinaryStore,
)

RunTests runs tests that confirm a journal implementation behaves correctly.

func Scan added in v0.8.0

func Scan[T, V any](
	ctx context.Context,
	j Journal[T],
	pos Position,
	scan ScanFunc[T, V],
) (V, error)

Scan finds a value of type V within j by scanning all records beginning with the record at the given position.

It returns a ValueNotFoundError if the value is not found.

This function is useful when the value being searched is not ordered, or when there are a small number of records to scan. If the records are structured in such a way that it's possible to know if the value appears before or after a specific record, use Search instead.

func ScanFromSearchResult added in v0.8.0

func ScanFromSearchResult[T, V any](
	ctx context.Context,
	j Journal[T],
	i Interval,
	cmp CompareFunc[T],
	scan ScanFunc[T, V],
) (V, error)

ScanFromSearchResult finds a value within j by scanning all records beginning with the record for which a binary search of the interval i using cmp as the comparator returns true. See Scan and Search.

It returns a ValueNotFoundError if the value is not found.

Types

type BinaryJournal added in v0.8.0

type BinaryJournal = Journal[[]byte]

A BinaryJournal is an append-only log of binary records.

type BinaryRangeFunc added in v0.8.0

type BinaryRangeFunc = RangeFunc[[]byte]

A BinaryRangeFunc is a function used to range over the records in a BinaryJournal.

If err is non-nil, ranging stops and err is propagated up the stack. Otherwise, if ok is false, ranging stops without any error being propagated.

type BinaryStore added in v0.8.0

type BinaryStore = Store[[]byte]

BinaryStore is a Store of journals that contain opaque binary records.

func WithTelemetry added in v0.2.1

WithTelemetry returns a BinaryStore that adds telemetry to s.

type CompareFunc added in v0.3.0

type CompareFunc[T any] func(context.Context, Position, T) (cmp int, err error)

CompareFunc is a function that compares a record to some datum.

If the record is less than the datum, cmp is negative. If the record is greater than the datum, cmp is positive. Otherwise, the record is considered equal to the datum.

type ConflictError added in v0.10.0

type ConflictError struct {
	Journal  string
	Position Position
}

ConflictError is returned by [Journal.Append] if there is already a record at the specified position.

func (ConflictError) Error added in v0.10.0

func (e ConflictError) Error() string

type Interval added in v0.10.0

type Interval struct {
	// Begin is the position of the first record in the interval.
	Begin Position

	// End is the position immediately after the last record in the interval.
	End Position
}

Interval describes a half-open interval of positions in a Journal.

func (Interval) Contains added in v0.10.0

func (i Interval) Contains(pos Position) bool

Contains returns true if the interval contains the given position.

func (Interval) IsEmpty added in v0.10.0

func (i Interval) IsEmpty() bool

IsEmpty returns true if the interval contains no records.

func (Interval) Len added in v0.10.0

func (i Interval) Len() int

Len returns the number of records in the interval.

func (Interval) Positions added in v0.10.0

func (i Interval) Positions() iter.Seq2[int, Position]

Positions returns a sequence of all positions in the interval.

func (Interval) String added in v0.10.0

func (i Interval) String() string

type Journal

type Journal[T any] interface {
	// Name returns the name of the journal.
	Name() string

	// Bounds returns the half-open interval [begin, end) describing the
	// positions of the first and last records in the journal.
	Bounds(ctx context.Context) (Interval, error)

	// Get returns the record at the given position.
	//
	// It returns a [RecordNotFoundError] if there is no record at the given
	// position.
	Get(ctx context.Context, pos Position) (rec T, err error)

	// Range invokes fn for each record in the journal, in order, starting with
	// the record at the given position.
	//
	// It returns a [RecordNotFoundError] if there is no record at the given
	// position. if there is no record at the given position.
	Range(ctx context.Context, pos Position, fn RangeFunc[T]) error

	// Append adds a record to the journal as the given position.
	//
	// The record is stored at the given position and the end of the journal
	// becomes pos + 1.
	//
	// pos must be the end of the journal, as returned by [Bounds]. If pos < end
	// then [ErrConflict] is returned, indicating that there is already a record
	// at the given position. The behavior is undefined if pos > end.
	Append(ctx context.Context, pos Position, rec T) error

	// Truncate removes journal records in the half-open interval [begin, pos),
	// such that pos becomes the new beginning of the journal.
	//
	// If it returns an error the truncation may have been partially applied.
	//
	// The behavior is undefined if pos > end.
	Truncate(ctx context.Context, pos Position) error

	// Close closes the journal.
	Close() error
}

A Journal is an append-only log containing records of type T.

type Position

type Position uint64

Position is the index of a record within a Journal. The first record is always at position 0.

func AppendWithConflictResolution added in v0.9.3

func AppendWithConflictResolution[T any](
	ctx context.Context,
	j Journal[T],
	end Position,
	rec T,
	fn func(context.Context, Position) (Position, error),
) (Position, error)

AppendWithConflictResolution appends a record to j using fn to resolve optimistic concurrency conflicts. It returns the new journal end position.

If a conflict occurs fn is called and the append is retried with the offset it returns. If fn returns an error the append is not retried the error is returned.

func FirstRecord added in v0.4.0

func FirstRecord[T any](ctx context.Context, j Journal[T]) (Position, T, bool, error)

FirstRecord returns the oldest record in a journal.

func LastRecord added in v0.4.0

func LastRecord[T any](ctx context.Context, j Journal[T]) (Position, T, bool, error)

LastRecord returns the newest record in a journal.

func Search[T any](
	ctx context.Context,
	j Journal[T],
	i Interval,
	cmp CompareFunc[T],
) (pos Position, rec T, err error)

Search performs a binary search of j within the interval i to find the position of the record for which cmp() returns zero.

It returns a ValueNotFoundError if there is no such record.

type RangeFunc

type RangeFunc[T any] func(context.Context, Position, T) (ok bool, err error)

A RangeFunc is a function used to range over the records in a Journal.

If err is non-nil, ranging stops and err is propagated up the stack. Otherwise, if ok is false, ranging stops without any error being propagated.

type RecordNotFoundError added in v0.10.0

type RecordNotFoundError struct {
	Journal  string
	Position Position
}

RecordNotFoundError is returned by [Journal.Get] and [Journal.Range] if the requested record does not exist, either because it has been truncated or because the given position has not been written yet.

func (RecordNotFoundError) Error added in v0.10.0

func (e RecordNotFoundError) Error() string

type ScanFunc added in v0.8.0

type ScanFunc[T, V any] func(ctx context.Context, pos Position, rec T) (v V, ok bool, err error)

ScanFunc is a predicate function that produces a value of type V from a record of type T.

If the record cannot be used to produce a value of type V, ok is false.

type Store

type Store[T any] interface {
	Open(ctx context.Context, name string) (Journal[T], error)
}

Store is a collection of journals containing records of type T.

func NewMarshalingStore added in v0.8.0

func NewMarshalingStore[T any](
	s BinaryStore,
	m marshaler.Marshaler[T],
) Store[T]

NewMarshalingStore returns a new Store that marshals/unmarshals records of type T to/from an underlying BinaryStore.

type ValueNotFoundError added in v0.10.0

type ValueNotFoundError struct{}

ValueNotFoundError is returned search and can operations if the target value is not found.

func (ValueNotFoundError) Error added in v0.10.0

func (e ValueNotFoundError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL