stream

package
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2024 License: GPL-3.0 Imports: 6 Imported by: 0

Documentation

Overview

Package stream provide a generic stream function.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Stream

func Stream[E any](ctx context.Context, deps Deps[E], srcChainID uint64, startHeight uint64, callback Callback[E]) error

Types

type Callback

type Callback[E any] func(ctx context.Context, elem E) error

type Deps

type Deps[E any] struct {

	// FetchBatch fetches the next batch of elements from the provided height (inclusive).
	// The elements must be sequential, since the internal height cursors is incremented for each element returned.
	FetchBatch func(ctx context.Context, chainID uint64, height uint64) ([]E, error)
	// Backoff returns a backoff function. See expbackoff package for the implementation.
	Backoff func(ctx context.Context) func()
	// Verify is a sanity check function, it ensures each element is valid.
	Verify func(ctx context.Context, elem E, height uint64) error
	// Height returns the height of an element.
	Height func(elem E) uint64

	// Config
	FetchWorkers  uint64
	ElemLabel     string
	RetryCallback bool

	// Metrics
	IncFetchErr        func()
	IncCallbackErr     func()
	SetStreamHeight    func(uint64)
	SetCallbackLatency func(time.Duration)
	StartTrace         func(ctx context.Context, height uint64, spanName string) (context.Context, trace.Span)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL