streams

package
v0.0.0-...-5068cfe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: MPL-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package streams provides a framework for creating and managing concurrent worker pools to process streams of data, including utilities for iterating over blocks of data.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BlockIterator

type BlockIterator struct {
	// contains filtered or unexported fields
}

BlockIterator manages the iteration over a range of blocks, supporting batching and thread-safe updates to the current offset.

func NewBlockIterator

func NewBlockIterator(offset uint64, end uint64, step *uint64) *BlockIterator

NewBlockIterator creates a new BlockIterator with the specified offset, end, and batch size.

func (*BlockIterator) Completed

func (b *BlockIterator) Completed() bool

Completed checks if the iteration has reached or passed the end of the block range.

func (*BlockIterator) GetCurrentOffset

func (b *BlockIterator) GetCurrentOffset() uint64

GetCurrentOffset returns the current offset in the block range.

func (*BlockIterator) GetEnd

func (b *BlockIterator) GetEnd() uint64

GetEnd returns the end of the block range.

func (*BlockIterator) GetEndAsBigInt

func (b *BlockIterator) GetEndAsBigInt() *big.Int

GetEndAsBigInt returns the end of the block range as a big.Int.

func (*BlockIterator) Next

func (b *BlockIterator) Next() (start uint64, end uint64, ok bool)

Next returns the next batch of blocks to be processed. It updates the current offset and returns the start and end of the batch, as well as a boolean indicating if the operation was successful.

type OrderedResult

type OrderedResult[T any, R *types.QueryResponse] struct {
	// contains filtered or unexported fields
}

OrderedResult holds the result of processing a descriptor, including its index, the response, and any error encountered.

type Worker

type Worker[T any, R *types.QueryResponse] struct {
	// contains filtered or unexported fields
}

Worker represents a type-specific worker that processes descriptors using a provided WorkerFn. It manages the processing state and results.

func NewWorker

func NewWorker[T any, R *types.QueryResponse](ctx context.Context, iterator *BlockIterator, channel chan R, done chan struct{}, opts *options.StreamOptions) (*Worker[T, R], error)

NewWorker creates a new instance of a type-specific Worker.

func (*Worker[T, R]) Ack

func (w *Worker[T, R]) Ack()

Ack acknowledges that a response has been processed.

func (*Worker[T, R]) Done

func (w *Worker[T, R]) Done() <-chan struct{}

Done returns a channel that can be used to signal when the worker's operations are done.

func (*Worker[T, R]) Start

func (w *Worker[T, R]) Start(workerFn WorkerFn[T, R], descriptor <-chan T) error

Start begins the worker's operation using the provided WorkerFn and a channel of descriptors.

func (*Worker[T, R]) Stop

func (w *Worker[T, R]) Stop() error

Stop stops the worker's operations and waits for all goroutines to complete.

type WorkerFn

type WorkerFn[T any, R *types.QueryResponse] func(descriptor T) (R, error)

WorkerFn defines a generic function type that takes a descriptor of type T and returns a response of type R and an error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL