stream

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: GPL-3.0 Imports: 7 Imported by: 0

Documentation

Overview

Package stream provide a generic stream function.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Stream

func Stream[E any](ctx context.Context, deps Deps[E], startHeight uint64, callback Callback[E]) error

Stream streams elements from the provided height (inclusive) of a specific chain. It fetches the batches of elements from the current height, and calls the callback function for each element in strictly-sequential order.

It supports concurrent fetching of single-element-batches only. It retries forever on fetch errors. It can either retry or return callback errors. It returns (nil) when the context is canceled.

Types

type Cache added in v0.11.0

type Cache[E any] interface {
	// Get returns strictly sequential elements from the provided height (inclusive) or nil.
	Get(from uint64) []E
	// Set sets elements from the provided height (inclusive).
	// The elements must be strictly sequential.
	Set(from uint64, elems []E)
}

Cache abstracts a simple element cache.

type Callback

type Callback[E any] func(ctx context.Context, elem E) error

type Deps

type Deps[E any] struct {

	// FetchBatch fetches the next batch of elements from the provided height (inclusive) if available.
	// Returned elements must be strictly-sequential, since the internal height cursors is incremented for each element returned.
	FetchBatch func(ctx context.Context, height uint64) ([]E, error)
	// Backoff returns a backoff function. See expbackoff package for the implementation.
	Backoff func(ctx context.Context) func()
	// Verify is a sanity check function, it ensures each element is valid.
	Verify func(ctx context.Context, elem E, height uint64) error
	// Height returns the height of an element.
	Height func(elem E) uint64

	// Config
	FetchWorkers  uint64
	ElemLabel     string
	HeightLabel   string
	RetryCallback bool

	// Metrics
	IncFetchErr        func()
	IncCallbackErr     func()
	SetStreamHeight    func(uint64)
	SetCallbackLatency func(time.Duration)
	StartTrace         func(ctx context.Context, height uint64, spanName string) (context.Context, trace.Span)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL