Documentation ¶
Index ¶
- Constants
- Variables
- func HandleSubscription[T any](sub Subscription, handleResponse func(resp T) error) error
- type BaseTracker
- type BaseTrackerImpl
- type BlockTracker
- type BlockTrackerImpl
- type ExecutionDataTracker
- type ExecutionDataTrackerImpl
- func (e *ExecutionDataTrackerImpl) GetHighestHeight() uint64
- func (e *ExecutionDataTrackerImpl) GetStartHeight(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) (uint64, error)
- func (e *ExecutionDataTrackerImpl) GetStartHeightFromBlockID(startBlockID flow.Identifier) (uint64, error)
- func (e *ExecutionDataTrackerImpl) GetStartHeightFromHeight(startHeight uint64) (uint64, error)
- func (e *ExecutionDataTrackerImpl) GetStartHeightFromLatest(ctx context.Context) (uint64, error)
- func (e *ExecutionDataTrackerImpl) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)
- type GetDataByHeightFunc
- type HeightBasedSubscription
- type Streamable
- type Streamer
- type StreamingData
- type Subscription
- type SubscriptionHandler
- type SubscriptionImpl
- func (sub *SubscriptionImpl) Channel() <-chan interface{}
- func (sub *SubscriptionImpl) Close()
- func (sub *SubscriptionImpl) Err() error
- func (sub *SubscriptionImpl) Fail(err error)
- func (sub *SubscriptionImpl) ID() string
- func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error
Constants ¶
const ( // DefaultSendBufferSize is the default buffer size for the subscription's send channel. // The size is chosen to balance memory overhead from each subscription with performance when // streaming existing data. DefaultSendBufferSize = 10 // DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time. DefaultMaxGlobalStreams = 1000 // DefaultCacheSize defines the default max number of objects for the execution data cache. DefaultCacheSize = 100 // DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout // expires, the connection is closed. DefaultSendTimeout = 30 * time.Second // DefaultResponseLimit is default max responses per second allowed on a stream. After exceeding // the limit, the stream is paused until more capacity is available. DefaultResponseLimit = float64(0) // DefaultHeartbeatInterval specifies the block interval at which heartbeat messages should be sent. DefaultHeartbeatInterval = 1 )
Variables ¶
var ErrBlockNotReady = errors.New("block not ready")
ErrBlockNotReady represents an error indicating that a block is not yet available or ready.
var ErrEndOfData = errors.New("end of data")
ErrEndOfData represents an error indicating that no more data available for streaming.
Functions ¶
func HandleSubscription ¶
func HandleSubscription[T any](sub Subscription, handleResponse func(resp T) error) error
HandleSubscription is a generic handler for subscriptions to a specific type. It continuously listens to the subscription channel, handles the received responses, and sends the processed information to the client via the provided stream using handleResponse.
Parameters: - sub: The subscription. - handleResponse: The function responsible for handling the response of the subscribed type.
Expected errors during normal operation:
- codes.Internal: If the subscription encounters an error or gets an unexpected response.
Types ¶
type BaseTracker ¶
type BaseTracker interface { // GetStartHeightFromBlockID returns the start height based on the provided starting block ID. // If the start block is the root block, skip it and begin from the next block. // // Parameters: // - startBlockID: The identifier of the starting block. // // Returns: // - uint64: The start height associated with the provided block ID. // - error: An error indicating any issues with retrieving the start height. // // Expected errors during normal operation: // - codes.NotFound - if the block was not found in storage // - codes.Internal - for any other error GetStartHeightFromBlockID(flow.Identifier) (uint64, error) // GetStartHeightFromHeight returns the start height based on the provided starting block height. // If the start block is the root block, skip it and begin from the next block. // // Parameters: // - startHeight: The height of the starting block. // // Returns: // - uint64: The start height associated with the provided block height. // - error: An error indicating any issues with retrieving the start height. // // Expected errors during normal operation: // - codes.InvalidArgument - if the start height is less than the root block height. // - codes.NotFound - if the header was not found in storage. GetStartHeightFromHeight(uint64) (uint64, error) // GetStartHeightFromLatest returns the start height based on the latest sealed block. // If the start block is the root block, skip it and begin from the next block. // // Parameters: // - ctx: Context for the operation. // // No errors are expected during normal operation. GetStartHeightFromLatest(context.Context) (uint64, error) }
BaseTracker is an interface for a tracker that provides base GetStartHeight method related to both blocks and execution data tracking.
type BaseTrackerImpl ¶
type BaseTrackerImpl struct {
// contains filtered or unexported fields
}
BaseTrackerImpl is an implementation of the BaseTracker interface.
func NewBaseTrackerImpl ¶
func NewBaseTrackerImpl( rootBlockHeight uint64, state protocol.State, headers storage.Headers, ) *BaseTrackerImpl
NewBaseTrackerImpl creates a new instance of BaseTrackerImpl.
Parameters: - rootBlockHeight: The root block height, which serves as the baseline for calculating the start height. - state: The protocol state used for retrieving block information. - headers: The storage headers for accessing block headers.
Returns: - *BaseTrackerImpl: A new instance of BaseTrackerImpl.
func (*BaseTrackerImpl) GetStartHeightFromBlockID ¶
func (b *BaseTrackerImpl) GetStartHeightFromBlockID(startBlockID flow.Identifier) (uint64, error)
GetStartHeightFromBlockID returns the start height based on the provided starting block ID. If the start block is the root block, skip it and begin from the next block.
Parameters: - startBlockID: The identifier of the starting block.
Returns: - uint64: The start height associated with the provided block ID. - error: An error indicating any issues with retrieving the start height.
Expected errors during normal operation: - codes.NotFound - if the block was not found in storage - codes.Internal - for any other error
func (*BaseTrackerImpl) GetStartHeightFromHeight ¶
func (b *BaseTrackerImpl) GetStartHeightFromHeight(startHeight uint64) (uint64, error)
GetStartHeightFromHeight returns the start height based on the provided starting block height. If the start block is the root block, skip it and begin from the next block.
Parameters: - startHeight: The height of the starting block.
Returns: - uint64: The start height associated with the provided block height. - error: An error indicating any issues with retrieving the start height.
Expected errors during normal operation: - codes.InvalidArgument - if the start height is less than the root block height. - codes.NotFound - if the header was not found in storage.
func (*BaseTrackerImpl) GetStartHeightFromLatest ¶
func (b *BaseTrackerImpl) GetStartHeightFromLatest(ctx context.Context) (uint64, error)
GetStartHeightFromLatest returns the start height based on the latest sealed block. If the start block is the root block, skip it and begin from the next block.
Parameters: - ctx: Context for the operation.
No errors are expected during normal operation.
type BlockTracker ¶
type BlockTracker interface { BaseTracker // GetHighestHeight returns the highest height based on the specified block status which could be only BlockStatusSealed // or BlockStatusFinalized. // No errors are expected during normal operation. GetHighestHeight(flow.BlockStatus) (uint64, error) // ProcessOnFinalizedBlock drives the subscription logic when a block is finalized. // The input to this callback is treated as trusted. This method should be executed on // `OnFinalizedBlock` notifications from the node-internal consensus instance. // No errors are expected during normal operation. ProcessOnFinalizedBlock() error }
BlockTracker is an interface for tracking blocks and handling block-related operations.
type BlockTrackerImpl ¶
type BlockTrackerImpl struct { BaseTracker // contains filtered or unexported fields }
BlockTrackerImpl is an implementation of the BlockTracker interface.
func NewBlockTracker ¶
func NewBlockTracker( state protocol.State, rootHeight uint64, headers storage.Headers, broadcaster *engine.Broadcaster, ) (*BlockTrackerImpl, error)
NewBlockTracker creates a new BlockTrackerImpl instance.
Parameters: - state: The protocol state used for retrieving block information. - rootHeight: The root block height, serving as the baseline for calculating the start height. - headers: The storage headers for accessing block headers. - broadcaster: The engine broadcaster for publishing notifications.
No errors are expected during normal operation.
func (*BlockTrackerImpl) GetHighestHeight ¶
func (b *BlockTrackerImpl) GetHighestHeight(blockStatus flow.BlockStatus) (uint64, error)
GetHighestHeight returns the highest height based on the specified block status.
Parameters: - blockStatus: The status of the block. It is expected that blockStatus has already been handled for invalid flow.BlockStatusUnknown.
Expected errors during normal operation: - codes.InvalidArgument - if block status is flow.BlockStatusUnknown.
func (*BlockTrackerImpl) ProcessOnFinalizedBlock ¶
func (b *BlockTrackerImpl) ProcessOnFinalizedBlock() error
ProcessOnFinalizedBlock drives the subscription logic when a block is finalized. The input to this callback is treated as trusted. This method should be executed on `OnFinalizedBlock` notifications from the node-internal consensus instance. No errors are expected during normal operation. Any errors encountered should be treated as an exception.
type ExecutionDataTracker ¶
type ExecutionDataTracker interface { BaseTracker // GetStartHeight returns the start height to use when searching. // Only one of startBlockID and startHeight may be set. Otherwise, an InvalidArgument error is returned. // If a block is provided and does not exist, a NotFound error is returned. // If neither startBlockID nor startHeight is provided, the latest sealed block is used. // If the start block is the root block, skip it and begin from the next block. // // Parameters: // - ctx: Context for the operation. // - startBlockID: The identifier of the starting block. If provided, startHeight should be 0. // - startHeight: The height of the starting block. If provided, startBlockID should be flow.ZeroID. // // Returns: // - uint64: The start height for searching. // - error: An error indicating the result of the operation, if any. // // Expected errors during normal operation: // - codes.InvalidArgument - if both startBlockID and startHeight are provided, if the start height is less than the root block height, // if the start height is out of bounds based on indexed heights (when index is used). // - codes.NotFound - if a block is provided and does not exist. // - codes.Internal - if there is an internal error. GetStartHeight(context.Context, flow.Identifier, uint64) (uint64, error) // GetHighestHeight returns the highest height that we have consecutive execution data for. GetHighestHeight() uint64 // OnExecutionData is used to notify the tracker when a new execution data is received. OnExecutionData(*execution_data.BlockExecutionDataEntity) }
ExecutionDataTracker is an interface for tracking the highest consecutive block height for which we have received a new Execution Data notification
type ExecutionDataTrackerImpl ¶
type ExecutionDataTrackerImpl struct { BaseTracker // contains filtered or unexported fields }
ExecutionDataTrackerImpl is an implementation of the ExecutionDataTracker interface.
func NewExecutionDataTracker ¶
func NewExecutionDataTracker( log zerolog.Logger, state protocol.State, rootHeight uint64, headers storage.Headers, broadcaster *engine.Broadcaster, highestAvailableFinalizedHeight uint64, indexReporter state_synchronization.IndexReporter, useIndex bool, ) *ExecutionDataTrackerImpl
NewExecutionDataTracker creates a new ExecutionDataTrackerImpl instance.
Parameters: - log: The logger to use for logging. - state: The protocol state used for retrieving block information. - rootHeight: The root block height, serving as the baseline for calculating the start height. - headers: The storage headers for accessing block headers. - broadcaster: The engine broadcaster for publishing notifications. - highestAvailableFinalizedHeight: The highest available finalized block height. - indexReporter: The index reporter for checking indexed block heights. - useIndex: A flag indicating whether to use indexed block heights for validation.
Returns: - *ExecutionDataTrackerImpl: A new instance of ExecutionDataTrackerImpl.
func (*ExecutionDataTrackerImpl) GetHighestHeight ¶
func (e *ExecutionDataTrackerImpl) GetHighestHeight() uint64
GetHighestHeight returns the highest height that we have consecutive execution data for.
func (*ExecutionDataTrackerImpl) GetStartHeight ¶
func (e *ExecutionDataTrackerImpl) GetStartHeight(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) (uint64, error)
GetStartHeight returns the start height to use when searching. Only one of startBlockID and startHeight may be set. Otherwise, an InvalidArgument error is returned. If a block is provided and does not exist, a NotFound error is returned. If neither startBlockID nor startHeight is provided, the latest sealed block is used. If the start block is the root block, skip it and begin from the next block.
Parameters: - ctx: Context for the operation. - startBlockID: The identifier of the starting block. If provided, startHeight should be 0. - startHeight: The height of the starting block. If provided, startBlockID should be flow.ZeroID.
Returns: - uint64: The start height for searching. - error: An error indicating the result of the operation, if any.
Expected errors during normal operation: - codes.InvalidArgument - if both startBlockID and startHeight are provided, if the start height is less than the root block height, if the start height is out of bounds based on indexed heights (when index is used). - codes.NotFound - if a block is provided and does not exist. - codes.Internal - if there is an internal error.
func (*ExecutionDataTrackerImpl) GetStartHeightFromBlockID ¶
func (e *ExecutionDataTrackerImpl) GetStartHeightFromBlockID(startBlockID flow.Identifier) (uint64, error)
GetStartHeightFromBlockID returns the start height based on the provided starting block ID.
Parameters: - startBlockID: The identifier of the starting block.
Returns: - uint64: The start height associated with the provided block ID. - error: An error indicating any issues with retrieving the start height.
Expected errors during normal operation: - codes.NotFound - if the block was not found in storage - codes.InvalidArgument - if the start height is out of bounds based on indexed heights. - codes.FailedPrecondition - if the index reporter is not ready yet. - codes.Internal - for any other error during validation.
func (*ExecutionDataTrackerImpl) GetStartHeightFromHeight ¶
func (e *ExecutionDataTrackerImpl) GetStartHeightFromHeight(startHeight uint64) (uint64, error)
GetStartHeightFromHeight returns the start height based on the provided starting block height.
Parameters: - startHeight: The height of the starting block.
Returns: - uint64: The start height associated with the provided block height. - error: An error indicating any issues with retrieving the start height.
Expected errors during normal operation: - codes.InvalidArgument - if the start height is less than the root block height, if the start height is out of bounds based on indexed heights - codes.NotFound - if the header was not found in storage. - codes.FailedPrecondition - if the index reporter is not ready yet. - codes.Internal - for any other error during validation.
func (*ExecutionDataTrackerImpl) GetStartHeightFromLatest ¶
func (e *ExecutionDataTrackerImpl) GetStartHeightFromLatest(ctx context.Context) (uint64, error)
GetStartHeightFromLatest returns the start height based on the latest sealed block.
Parameters: - ctx: Context for the operation.
Expected errors during normal operation: - codes.InvalidArgument - if the start height is out of bounds based on indexed heights. - codes.FailedPrecondition - if the index reporter is not ready yet. - codes.Internal - for any other error during validation.
func (*ExecutionDataTrackerImpl) OnExecutionData ¶
func (e *ExecutionDataTrackerImpl) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)
OnExecutionData is used to notify the tracker when a new execution data is received.
type GetDataByHeightFunc ¶
GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions
type HeightBasedSubscription ¶
type HeightBasedSubscription struct { *SubscriptionImpl // contains filtered or unexported fields }
HeightBasedSubscription is a subscription that retrieves data sequentially by block height
func NewHeightBasedSubscription ¶
func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription
type Streamable ¶
type Streamable interface { // ID returns the subscription ID // Note: this is not a cryptographic hash ID() string // Close is called when a subscription ends gracefully, and closes the subscription channel Close() // Fail registers an error and closes the subscription channel Fail(error) // Send sends a value to the subscription channel or returns an error // Expected errors: // - context.DeadlineExceeded if send timed out // - context.Canceled if the client disconnected Send(context.Context, interface{}, time.Duration) error // Next returns the value for the next height from the subscription Next(context.Context) (interface{}, error) }
Streamable represents a subscription that can be streamed.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer represents a streaming subscription that delivers data to clients.
func NewStreamer ¶
func NewStreamer( log zerolog.Logger, broadcaster *engine.Broadcaster, sendTimeout time.Duration, limit float64, sub Streamable, ) *Streamer
NewStreamer creates a new Streamer instance.
type StreamingData ¶
StreamingData represents common streaming data configuration for access and state_stream handlers.
func NewStreamingData ¶
func NewStreamingData(maxStreams uint32) StreamingData
type Subscription ¶
type Subscription interface { // ID returns the unique identifier for this subscription used for logging ID() string // Channel returns the channel from which subscription data can be read Channel() <-chan interface{} // Err returns the error that caused the subscription to fail Err() error }
Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.
type SubscriptionHandler ¶
type SubscriptionHandler struct {
// contains filtered or unexported fields
}
SubscriptionHandler represents common streaming data configuration for creating streaming subscription.
func NewSubscriptionHandler ¶
func NewSubscriptionHandler( log zerolog.Logger, broadcaster *engine.Broadcaster, sendTimeout time.Duration, responseLimit float64, sendBufferSize uint, ) *SubscriptionHandler
NewSubscriptionHandler creates a new SubscriptionHandler instance.
Parameters: - log: The logger to use for logging. - broadcaster: The engine broadcaster for publishing notifications. - sendTimeout: The duration after which a send operation will timeout. - responseLimit: The maximum allowed response time for a single stream. - sendBufferSize: The size of the response buffer for sending messages to the client.
Returns a new SubscriptionHandler instance.
func (*SubscriptionHandler) Subscribe ¶
func (h *SubscriptionHandler) Subscribe( ctx context.Context, startHeight uint64, getData GetDataByHeightFunc, ) Subscription
Subscribe creates and starts a new subscription.
Parameters: - ctx: The context for the operation. - startHeight: The height to start subscription from. - getData: The function to retrieve data by height.
type SubscriptionImpl ¶
type SubscriptionImpl struct {
// contains filtered or unexported fields
}
func NewFailedSubscription ¶
func NewFailedSubscription(err error, msg string) *SubscriptionImpl
NewFailedSubscription returns a new subscription that has already failed with the given error and message. This is useful to return an error that occurred during subscription setup.
func NewSubscription ¶
func NewSubscription(bufferSize int) *SubscriptionImpl
func (*SubscriptionImpl) Channel ¶
func (sub *SubscriptionImpl) Channel() <-chan interface{}
Channel returns the channel from which subscription data can be read
func (*SubscriptionImpl) Close ¶
func (sub *SubscriptionImpl) Close()
Close is called when a subscription ends gracefully, and closes the subscription channel
func (*SubscriptionImpl) Err ¶
func (sub *SubscriptionImpl) Err() error
Err returns the error that caused the subscription to fail
func (*SubscriptionImpl) Fail ¶
func (sub *SubscriptionImpl) Fail(err error)
Fail registers an error and closes the subscription channel
func (*SubscriptionImpl) ID ¶
func (sub *SubscriptionImpl) ID() string
ID returns the subscription ID Note: this is not a cryptographic hash