checkpoint

package
v4.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: MIT Imports: 3 Imported by: 0

Documentation

Overview

Package checkpoint implements a mechanism for tracking checkpointed integer offsets for sequential read at-least-once queue systems such as Kafka or Kinesis.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Capped

type Capped struct {
	// contains filtered or unexported fields
}

Capped receives an ordered feed of integer based offsets being tracked, and an unordered feed of integer based offsets that are resolved, and is able to return the highest offset currently able to be committed such that an unresolved offset is never committed.

If the number of unresolved tracked values meets a given cap the next attempt to track a value will be blocked until the next value is resolved.

This component is safe to use concurrently across goroutines.

func NewCapped

func NewCapped(capacity int64) *Capped

NewCapped returns a new capped checkpointer.

func (*Capped) Highest

func (c *Capped) Highest() interface{}

Highest returns the current highest checkpoint.

func (*Capped) Track

func (c *Capped) Track(ctx context.Context, payload interface{}, batchSize int64) (func() interface{}, error)

Track a new unresolved integer offset. This offset will be cached until it is marked as resolved. While it is cached no higher valued offset will ever be committed. If the provided value is lower than an already provided value an error is returned.

type Type

type Type struct {
	// contains filtered or unexported fields
}

Type keeps track of a sequence of pending checkpoint payloads, and as pending checkpoints are resolved it retains the latest fully resolved payload in the sequence where all prior sequence checkpoints are also resolved.

Also keeps track of the logical size of the unresolved sequence, which allows for limiting the number of pending checkpoints.

func New

func New() *Type

New returns a new checkpointer.

func (*Type) Highest

func (t *Type) Highest() interface{}

Highest returns the payload of the highest resolved checkpoint.

func (*Type) Pending

func (t *Type) Pending() int64

Pending returns the gap between the earliest and latests unresolved messages.

func (*Type) Track

func (t *Type) Track(payload interface{}, batchSize int64) func() interface{}

Track a new unresolved payload. This payload will be cached until it is marked as resolved. While it is cached no more recent payload will ever be committed.

While the returned resolve funcs can be called from any goroutine, it is assumed that Track is called from a single goroutine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL