Documentation ¶
Index ¶
- type Config
- type CreateFunc
- type Engine
- func (e *Engine) Done() <-chan struct{}
- func (e *Engine) EntityByID(entityID flow.Identifier, selector flow.IdentityFilter[flow.Identity])
- func (e *Engine) Force()
- func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message interface{}) error
- func (e *Engine) ProcessLocal(message interface{}) error
- func (e *Engine) Query(key flow.Identifier, selector flow.IdentityFilter[flow.Identity])
- func (e *Engine) Ready() <-chan struct{}
- func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, message interface{})
- func (e *Engine) SubmitLocal(message interface{})
- func (e *Engine) WithHandle(handle HandleFunc)
- type HandleFunc
- type Item
- type OptionFunc
- func WithBatchInterval(interval time.Duration) OptionFunc
- func WithBatchThreshold(threshold uint) OptionFunc
- func WithRetryAttempts(attempts uint) OptionFunc
- func WithRetryFunction(retry RetryFunc) OptionFunc
- func WithRetryInitial(interval time.Duration) OptionFunc
- func WithRetryMaximum(interval time.Duration) OptionFunc
- func WithValidateStaking(validateStaking bool) OptionFunc
- type RetryFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { BatchInterval time.Duration // minimum interval between requests BatchThreshold uint // maximum batch size for one request RetryInitial time.Duration // interval after which we retry request for an entity RetryFunction RetryFunc // function determining growth of retry interval RetryMaximum time.Duration // maximum interval for retrying request for an entity RetryAttempts uint // maximum amount of request attempts per entity ValidateStaking bool // should staking of target/origin be checked }
type CreateFunc ¶
CreateFunc is a function that creates a `flow.Entity` with an underlying type so that we can properly decode entities transmitted over the network.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is a generic requester engine, handling the requesting of entities on the flow network. It is the `request` part of the request-reply pattern provided by the pair of generic exchange engines.
func New ¶
func New(log zerolog.Logger, metrics module.EngineMetrics, net network.EngineRegistry, me module.Local, state protocol.State, channel channels.Channel, selector flow.IdentityFilter[flow.Identity], create CreateFunc, options ...OptionFunc) (*Engine, error)
New creates a new requester engine, operating on the provided network channel, and requesting entities from a node within the set obtained by applying the provided selector filter. The options allow customization of the parameters related to the batch and retry logic.
func (*Engine) Done ¶
func (e *Engine) Done() <-chan struct{}
Done returns a done channel that is closed once the engine has fully stopped. For the consensus engine, we wait for hotstuff to finish.
func (*Engine) EntityByID ¶
func (e *Engine) EntityByID(entityID flow.Identifier, selector flow.IdentityFilter[flow.Identity])
EntityByID adds an entity to the list of entities to be requested from the provider. It is idempotent, meaning that adding the same entity to the requester engine multiple times has no effect, unless the item has expired due to too many requests and has thus been deleted from the list. The provided selector will be applied to the set of valid providers on top of the global selector injected upon construction. It allows for finer-grained control over which subset of providers to request a given entity from, such as selection of a collection cluster. Use `filter.Any` if no additional selection is required. Checks integrity of response to make sure that we got entity that we were requesting.
func (*Engine) Force ¶
func (e *Engine) Force()
Force will force the requester engine to dispatch all currently valid batch requests.
func (*Engine) Process ¶
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message interface{}) error
Process processes the given message from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.
func (*Engine) ProcessLocal ¶
ProcessLocal processes an message originating on the local node.
func (*Engine) Query ¶ added in v0.15.0
func (e *Engine) Query(key flow.Identifier, selector flow.IdentityFilter[flow.Identity])
Query will request data through the request engine backing the interface. The additional selector will be applied to the subset of valid providers for the data and allows finer-grained control over which providers to request data from. Doesn't perform integrity check can be used to get entities without knowing their ID.
func (*Engine) Ready ¶
func (e *Engine) Ready() <-chan struct{}
Ready returns a ready channel that is closed once the engine has fully started. For consensus engine, this is true once the underlying consensus algorithm has started.
func (*Engine) Submit ¶
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, message interface{})
Submit submits the given message from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.
func (*Engine) SubmitLocal ¶
func (e *Engine) SubmitLocal(message interface{})
SubmitLocal submits an message originating on the local node.
func (*Engine) WithHandle ¶
func (e *Engine) WithHandle(handle HandleFunc)
WithHandle sets the handle function of the requester, which is how it processes returned entities. The engine can not be started without setting the handle function. It is done in a separate call so that the requester can be injected into engines upon construction, and then provide a handle function to the requester from that engine itself.
type HandleFunc ¶
type HandleFunc func(originID flow.Identifier, entity flow.Entity)
HandleFunc is a function provided to the requester engine to handle an entity once it has been retrieved from a provider. The function should be non-blocking and errors should be handled internally within the function.
type Item ¶
type Item struct { EntityID flow.Identifier // ID for the entity to be requested NumAttempts uint // number of times the entity was requested LastRequested time.Time // approximate timestamp of last request RetryAfter time.Duration // interval until request should be retried ExtraSelector flow.IdentityFilter[flow.Identity] // additional filters for providers of this entity // contains filtered or unexported fields }
type OptionFunc ¶
type OptionFunc func(*Config)
func WithBatchInterval ¶
func WithBatchInterval(interval time.Duration) OptionFunc
WithBatchInterval sets a custom interval at which we scan for pending items and batch them for requesting.
func WithBatchThreshold ¶
func WithBatchThreshold(threshold uint) OptionFunc
WithBatchThreshold sets a custom threshold for the maximum size of a batch. If we have the given amount of pending items, we immediately send a batch.
func WithRetryAttempts ¶
func WithRetryAttempts(attempts uint) OptionFunc
WithRetryAttempts sets the number of attempts we will make before we give up on retrying. Use zero for infinite retries.
func WithRetryFunction ¶
func WithRetryFunction(retry RetryFunc) OptionFunc
WithRetryFunction sets the function at which the retry interval increases.
func WithRetryInitial ¶
func WithRetryInitial(interval time.Duration) OptionFunc
WithRetryInitial sets the initial interval for dispatching a request for the second time.
func WithRetryMaximum ¶
func WithRetryMaximum(interval time.Duration) OptionFunc
WithRetryMaximum sets the maximum retry interval at which we will retry.
func WithValidateStaking ¶ added in v0.21.0
func WithValidateStaking(validateStaking bool) OptionFunc
WithValidateStaking sets the flag which determines if the target and origin must be checked for staking
type RetryFunc ¶
func RetryConstant ¶
func RetryConstant() RetryFunc