requester

package
v0.14.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2021 License: AGPL-3.0 Imports: 14 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	BatchInterval  time.Duration // minimum interval between requests
	BatchThreshold uint          // maximum batch size for one request
	RetryInitial   time.Duration // interval after which we retry request for an entity
	RetryFunction  RetryFunc     // function determining growth of retry interval
	RetryMaximum   time.Duration // maximum interval for retrying request for an entity
	RetryAttempts  uint          // maximum amount of request attemps per entity
}

type CreateFunc

type CreateFunc func() flow.Entity

CreateFunc is a function that creates a `flow.Entity` with an underlying type so that we can properly decode entities transmitted over the network.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is a generic requester engine, handling the requesting of entities on the flow network. It is the `request` part of the request-reply pattern provided by the pair of generic exchange engines.

func New

func New(log zerolog.Logger, metrics module.EngineMetrics, net module.Network, me module.Local, state protocol.State,
	channel network.Channel, selector flow.IdentityFilter, create CreateFunc, options ...OptionFunc) (*Engine, error)

New creates a new requester engine, operating on the provided network channel, and requesting entities from a node within the set obtained by applying the provided selector filter. The options allow customization of the parameters related to the batch and retry logic.

func (*Engine) Done

func (e *Engine) Done() <-chan struct{}

Done returns a done channel that is closed once the engine has fully stopped. For the consensus engine, we wait for hotstuff to finish.

func (*Engine) EntityByID

func (e *Engine) EntityByID(entityID flow.Identifier, selector flow.IdentityFilter)

EntityByID adds an entity to the list of entities to be requested from the provider. It is idempotent, meaning that adding the same entity to the requester engine multiple times has no effect, unless the item has expired due to too many requests and has thus been deleted from the list. The provided selector will be applied to the set of valid providers on top of the global selector injected upon construction. It allows for finer-grained control over which subset of providers to request a given entity from, such as selection of a collection cluster. Use `filter.Any` if no additional selection is required.

func (*Engine) Force

func (e *Engine) Force()

Force will force the requester engine to dispatch all currently valid batch requests.

func (*Engine) Process

func (e *Engine) Process(originID flow.Identifier, message interface{}) error

Process processes the given message from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.

func (*Engine) ProcessLocal

func (e *Engine) ProcessLocal(message interface{}) error

ProcessLocal processes an message originating on the local node.

func (*Engine) Ready

func (e *Engine) Ready() <-chan struct{}

Ready returns a ready channel that is closed once the engine has fully started. For consensus engine, this is true once the underlying consensus algorithm has started.

func (*Engine) Submit

func (e *Engine) Submit(originID flow.Identifier, message interface{})

Submit submits the given message from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.

func (*Engine) SubmitLocal

func (e *Engine) SubmitLocal(message interface{})

SubmitLocal submits an message originating on the local node.

func (*Engine) WithHandle

func (e *Engine) WithHandle(handle HandleFunc) *Engine

WithHandle sets the handle function of the requester, which is how it processes returned entities. The engine can not be started without setting the handle function. It is done in a separate call so that the requester can be injected into engines upon construction, and then provide a handle function to the requester from that engine itself.

type HandleFunc

type HandleFunc func(originID flow.Identifier, entity flow.Entity)

HandleFunc is a function provided to the requester engine to handle an entity once it has been retrieved from a provider. The function should be non-blocking and errors should be handled internally within the function.

type Item

type Item struct {
	EntityID      flow.Identifier     // ID for the entity to be requested
	NumAttempts   uint                // number of times the entity was requested
	LastRequested time.Time           // approximate timestamp of last request
	RetryAfter    time.Duration       // interval until request should be retried
	ExtraSelector flow.IdentityFilter // additional filters for providers of this entity
}

type OptionFunc

type OptionFunc func(*Config)

func WithBatchInterval

func WithBatchInterval(interval time.Duration) OptionFunc

WithBatchInterval sets a custom interval at which we scan for pending items and batch them for requesting.

func WithBatchThreshold

func WithBatchThreshold(threshold uint) OptionFunc

WithBatchThreshold sets a custom threshold for the maximum size of a batch. If we have the given amount of pending items, we immediately send a batch.

func WithRetryAttempts

func WithRetryAttempts(attempts uint) OptionFunc

WithRetryAttempts sets the number of attempts we will make before we give up on retrying. Use zero for infinite retries.

func WithRetryFunction

func WithRetryFunction(retry RetryFunc) OptionFunc

WithRetryFunction sets the function at which the retry interval increases.

func WithRetryInitial

func WithRetryInitial(interval time.Duration) OptionFunc

WithRetryInitial sets the initial interval for dispatching a request for the second time.

func WithRetryMaximum

func WithRetryMaximum(interval time.Duration) OptionFunc

WithRetryMaximum sets the maximum retry interval at which we will retry.

type RetryFunc

type RetryFunc func(time.Duration) time.Duration

func RetryConstant

func RetryConstant() RetryFunc

func RetryExponential

func RetryExponential(exponent float64) RetryFunc

func RetryGeometric

func RetryGeometric(factor float64) RetryFunc

func RetryLinear

func RetryLinear(increase time.Duration) RetryFunc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL