stream

package
v0.1.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2024 License: GPL-3.0 Imports: 3 Imported by: 0

Documentation

Overview

Package stream provide a generic stream function.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Stream

func Stream[E any](ctx context.Context, deps Deps[E], srcChainID uint64, height uint64, callback Callback[E]) error

Stream streams elements from the provided height (inclusive) on a specific chain. It fetches the next batch of elements from the current height, then calls the callback function for each, then repeats (forever).

It retries forever on fetch errors. It can either retry or return callback errors. It returns (nil) when the context is canceled.

Types

type Callback

type Callback[E any] func(ctx context.Context, elem E) error

type Deps

type Deps[E any] struct {

	// FetchBatch fetches the next batch elements from the provided height (inclusive).
	// The elements must be sequential, since the internal height cursors is incremented for each element returned.
	FetchBatch func(ctx context.Context, chainID uint64, height uint64) ([]E, error)
	// Backoff returns a backoff and reset function. See expbackoff package for the implementation.
	Backoff func(ctx context.Context) (func(), func())
	// Verify is a sanity check function, it ensures each element is valid.
	Verify func(ctx context.Context, elem E, height uint64) error

	// Config
	ElemLabel     string
	RetryCallback bool

	// Metrics
	IncFetchErr     func()
	IncCallbackErr  func()
	SetStreamHeight func(uint64)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL