checkpoint

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: MIT Imports: 2 Imported by: 8

README

Checkpoint

This is a Go package providing solutions that can be used in order to convert an offset-based log of discrete data payloads into a queue, where a series of log offsets can be processed and committed asynchronously without the risk of a given offset being prematurely committed.

For more information about how this package functions and the problem it solves check out this crazy video.

This package is "complete" in the sense that no further development work is planned and any PRs proposing to expand its scope will be rejected. Please report bug fixes and feel free to raise PRs to address them as these will be .

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Capped

type Capped[T any] struct {
	// contains filtered or unexported fields
}

Capped receives an ordered feed of integer based offsets being tracked, and an unordered feed of integer based offsets that are resolved, and is able to return the highest offset currently able to be committed such that an unresolved offset is never committed.

If the number of unresolved tracked values meets a given cap the next attempt to track a value will be blocked until the next value is resolved.

This component is safe to use concurrently across goroutines.

func NewCapped

func NewCapped[T any](capacity int64) *Capped[T]

NewCapped returns a new capped checkpointer.

func (*Capped[T]) Highest

func (c *Capped[T]) Highest() *T

Highest returns the current highest checkpoint.

func (*Capped[T]) Track

func (c *Capped[T]) Track(ctx context.Context, payload T, batchSize int64) (func() *T, error)

Track a new unresolved integer offset. This offset will be cached until it is marked as resolved. While it is cached no higher valued offset will ever be committed. If the provided value is lower than an already provided value an error is returned.

type Uncapped

type Uncapped[T any] struct {
	// contains filtered or unexported fields
}

Uncapped keeps track of a sequence of pending checkpoint payloads, and as pending checkpoints are resolved it retains the latest fully resolved payload in the sequence where all prior sequence checkpoints are also resolved.

Also keeps track of the logical size of the unresolved sequence, which allows for limiting the number of pending checkpoints.

func NewUncapped

func NewUncapped[T any]() *Uncapped[T]

NewUncapped returns a new check pointer implemented via a linked list.

func (*Uncapped[T]) Highest

func (t *Uncapped[T]) Highest() *T

Highest returns the payload of the highest resolved checkpoint.

func (*Uncapped[T]) Pending

func (t *Uncapped[T]) Pending() int64

Pending returns the gap between the baseline and the end of our checkpoints.

func (*Uncapped[T]) Track

func (t *Uncapped[T]) Track(payload T, batchSize int64) func() *T

Track a new unresolved payload. This payload will be cached until it is marked as resolved. While it is cached no more recent payload will ever be committed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL