subscription

package
v0.38.0-preview.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: AGPL-3.0 Imports: 23 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// DefaultSendBufferSize is the default buffer size for the subscription's send channel.
	// The size is chosen to balance memory overhead from each subscription with performance when
	// streaming existing data.
	DefaultSendBufferSize = 10

	// DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time.
	DefaultMaxGlobalStreams = 1000

	// DefaultCacheSize defines the default max number of objects for the execution data cache.
	DefaultCacheSize = 100

	// DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout
	// expires, the connection is closed.
	DefaultSendTimeout = 30 * time.Second

	// DefaultResponseLimit is default max responses per second allowed on a stream. After exceeding
	// the limit, the stream is paused until more capacity is available.
	DefaultResponseLimit = float64(0)

	// DefaultHeartbeatInterval specifies the block interval at which heartbeat messages should be sent.
	DefaultHeartbeatInterval = 1
)

Variables

View Source
var ErrBlockNotReady = errors.New("block not ready")

ErrBlockNotReady represents an error indicating that a block is not yet available or ready.

View Source
var ErrEndOfData = errors.New("end of data")

ErrEndOfData represents an error indicating that no more data available for streaming.

Functions

func HandleRPCSubscription

func HandleRPCSubscription[T any](sub Subscription, handleResponse func(resp T) error) error

HandleRPCSubscription is a generic handler for subscriptions to a specific type for rpc calls.

Parameters: - sub: The subscription. - handleResponse: The function responsible for handling the response of the subscribed type.

Expected errors during normal operation:

  • codes.Internal: If the subscription encounters an error or gets an unexpected response.

func HandleResponse

func HandleResponse[T any](send chan<- interface{}, transform func(resp T) (interface{}, error)) func(resp T) error

HandleResponse processes a generic response of type and sends it to the provided channel.

Parameters: - send: The channel to which the processed response is sent. - transform: A function to transform the response into the expected interface{} type.

No errors are expected during normal operations.

func HandleSubscription

func HandleSubscription[T any](sub Subscription, handleResponse func(resp T) error) error

HandleSubscription is a generic handler for subscriptions to a specific type. It continuously listens to the subscription channel, handles the received responses, and sends the processed information to the client via the provided stream using handleResponse.

Parameters: - sub: The subscription. - handleResponse: The function responsible for handling the response of the subscribed type.

No errors are expected during normal operations.

Types

type BaseTracker

type BaseTracker interface {
	// GetStartHeightFromBlockID returns the start height based on the provided starting block ID.
	// If the start block is the root block, skip it and begin from the next block.
	//
	// Parameters:
	// - startBlockID: The identifier of the starting block.
	//
	// Returns:
	// - uint64: The start height associated with the provided block ID.
	// - error: An error indicating any issues with retrieving the start height.
	//
	// Expected errors during normal operation:
	// - codes.NotFound - if the block was not found in storage
	// - codes.Internal - for any other error
	GetStartHeightFromBlockID(flow.Identifier) (uint64, error)
	// GetStartHeightFromHeight returns the start height based on the provided starting block height.
	// If the start block is the root block, skip it and begin from the next block.
	//
	// Parameters:
	// - startHeight: The height of the starting block.
	//
	// Returns:
	// - uint64: The start height associated with the provided block height.
	// - error: An error indicating any issues with retrieving the start height.
	//
	// Expected errors during normal operation:
	// - codes.InvalidArgument   - if the start height is less than the root block height.
	// - codes.NotFound  - if the header was not found in storage.
	GetStartHeightFromHeight(uint64) (uint64, error)
	// GetStartHeightFromLatest returns the start height based on the latest sealed block.
	// If the start block is the root block, skip it and begin from the next block.
	//
	// Parameters:
	// - ctx: Context for the operation.
	//
	// No errors are expected during normal operation.
	GetStartHeightFromLatest(context.Context) (uint64, error)
}

BaseTracker is an interface for a tracker that provides base GetStartHeight method related to both blocks and execution data tracking.

type BaseTrackerImpl

type BaseTrackerImpl struct {
	// contains filtered or unexported fields
}

BaseTrackerImpl is an implementation of the BaseTracker interface.

func NewBaseTrackerImpl

func NewBaseTrackerImpl(
	rootBlockHeight uint64,
	state protocol.State,
	headers storage.Headers,
) *BaseTrackerImpl

NewBaseTrackerImpl creates a new instance of BaseTrackerImpl.

Parameters: - rootBlockHeight: The root block height, which serves as the baseline for calculating the start height. - state: The protocol state used for retrieving block information. - headers: The storage headers for accessing block headers.

Returns: - *BaseTrackerImpl: A new instance of BaseTrackerImpl.

func (*BaseTrackerImpl) GetStartHeightFromBlockID

func (b *BaseTrackerImpl) GetStartHeightFromBlockID(startBlockID flow.Identifier) (uint64, error)

GetStartHeightFromBlockID returns the start height based on the provided starting block ID. If the start block is the root block, skip it and begin from the next block.

Parameters: - startBlockID: The identifier of the starting block.

Returns: - uint64: The start height associated with the provided block ID. - error: An error indicating any issues with retrieving the start height.

Expected errors during normal operation: - codes.NotFound - if the block was not found in storage - codes.Internal - for any other error

func (*BaseTrackerImpl) GetStartHeightFromHeight

func (b *BaseTrackerImpl) GetStartHeightFromHeight(startHeight uint64) (uint64, error)

GetStartHeightFromHeight returns the start height based on the provided starting block height. If the start block is the root block, skip it and begin from the next block.

Parameters: - startHeight: The height of the starting block.

Returns: - uint64: The start height associated with the provided block height. - error: An error indicating any issues with retrieving the start height.

Expected errors during normal operation: - codes.InvalidArgument - if the start height is less than the root block height. - codes.NotFound - if the header was not found in storage.

func (*BaseTrackerImpl) GetStartHeightFromLatest

func (b *BaseTrackerImpl) GetStartHeightFromLatest(ctx context.Context) (uint64, error)

GetStartHeightFromLatest returns the start height based on the latest sealed block. If the start block is the root block, skip it and begin from the next block.

Parameters: - ctx: Context for the operation.

No errors are expected during normal operation.

type BlockTracker

type BlockTracker interface {
	BaseTracker
	// GetHighestHeight returns the highest height based on the specified block status which could be only BlockStatusSealed
	// or BlockStatusFinalized.
	// No errors are expected during normal operation.
	GetHighestHeight(flow.BlockStatus) (uint64, error)
	// ProcessOnFinalizedBlock drives the subscription logic when a block is finalized.
	// The input to this callback is treated as trusted. This method should be executed on
	// `OnFinalizedBlock` notifications from the node-internal consensus instance.
	// No errors are expected during normal operation.
	ProcessOnFinalizedBlock() error
}

BlockTracker is an interface for tracking blocks and handling block-related operations.

type BlockTrackerImpl

type BlockTrackerImpl struct {
	BaseTracker
	// contains filtered or unexported fields
}

BlockTrackerImpl is an implementation of the BlockTracker interface.

func NewBlockTracker

func NewBlockTracker(
	state protocol.State,
	rootHeight uint64,
	headers storage.Headers,
	broadcaster *engine.Broadcaster,
) (*BlockTrackerImpl, error)

NewBlockTracker creates a new BlockTrackerImpl instance.

Parameters: - state: The protocol state used for retrieving block information. - rootHeight: The root block height, serving as the baseline for calculating the start height. - headers: The storage headers for accessing block headers. - broadcaster: The engine broadcaster for publishing notifications.

No errors are expected during normal operation.

func (*BlockTrackerImpl) GetHighestHeight

func (b *BlockTrackerImpl) GetHighestHeight(blockStatus flow.BlockStatus) (uint64, error)

GetHighestHeight returns the highest height based on the specified block status.

Parameters: - blockStatus: The status of the block. It is expected that blockStatus has already been handled for invalid flow.BlockStatusUnknown.

Expected errors during normal operation: - codes.InvalidArgument - if block status is flow.BlockStatusUnknown.

func (*BlockTrackerImpl) ProcessOnFinalizedBlock

func (b *BlockTrackerImpl) ProcessOnFinalizedBlock() error

ProcessOnFinalizedBlock drives the subscription logic when a block is finalized. The input to this callback is treated as trusted. This method should be executed on `OnFinalizedBlock` notifications from the node-internal consensus instance. No errors are expected during normal operation. Any errors encountered should be treated as an exception.

type ExecutionDataTracker

type ExecutionDataTracker interface {
	BaseTracker
	// GetStartHeight returns the start height to use when searching.
	// Only one of startBlockID and startHeight may be set. Otherwise, an InvalidArgument error is returned.
	// If a block is provided and does not exist, a NotFound error is returned.
	// If neither startBlockID nor startHeight is provided, the latest sealed block is used.
	// If the start block is the root block, skip it and begin from the next block.
	//
	// Parameters:
	// - ctx: Context for the operation.
	// - startBlockID: The identifier of the starting block. If provided, startHeight should be 0.
	// - startHeight: The height of the starting block. If provided, startBlockID should be flow.ZeroID.
	//
	// Returns:
	// - uint64: The start height for searching.
	// - error: An error indicating the result of the operation, if any.
	//
	// Expected errors during normal operation:
	// - codes.InvalidArgument - if both startBlockID and startHeight are provided, if the start height is less than the root block height,
	// if the start height is out of bounds based on indexed heights (when index is used).
	// - codes.NotFound   - if a block is provided and does not exist.
	// - codes.Internal        - if there is an internal error.
	GetStartHeight(context.Context, flow.Identifier, uint64) (uint64, error)
	// GetHighestHeight returns the highest height that we have consecutive execution data for.
	GetHighestHeight() uint64
	// OnExecutionData is used to notify the tracker when a new execution data is received.
	OnExecutionData(*execution_data.BlockExecutionDataEntity)
}

ExecutionDataTracker is an interface for tracking the highest consecutive block height for which we have received a new Execution Data notification

type ExecutionDataTrackerImpl

type ExecutionDataTrackerImpl struct {
	BaseTracker
	// contains filtered or unexported fields
}

ExecutionDataTrackerImpl is an implementation of the ExecutionDataTracker interface.

func NewExecutionDataTracker

func NewExecutionDataTracker(
	log zerolog.Logger,
	state protocol.State,
	rootHeight uint64,
	headers storage.Headers,
	broadcaster *engine.Broadcaster,
	highestAvailableFinalizedHeight uint64,
	indexReporter state_synchronization.IndexReporter,
	useIndex bool,
) *ExecutionDataTrackerImpl

NewExecutionDataTracker creates a new ExecutionDataTrackerImpl instance.

Parameters: - log: The logger to use for logging. - state: The protocol state used for retrieving block information. - rootHeight: The root block height, serving as the baseline for calculating the start height. - headers: The storage headers for accessing block headers. - broadcaster: The engine broadcaster for publishing notifications. - highestAvailableFinalizedHeight: The highest available finalized block height. - indexReporter: The index reporter for checking indexed block heights. - useIndex: A flag indicating whether to use indexed block heights for validation.

Returns: - *ExecutionDataTrackerImpl: A new instance of ExecutionDataTrackerImpl.

func (*ExecutionDataTrackerImpl) GetHighestHeight

func (e *ExecutionDataTrackerImpl) GetHighestHeight() uint64

GetHighestHeight returns the highest height that we have consecutive execution data for.

func (*ExecutionDataTrackerImpl) GetStartHeight

func (e *ExecutionDataTrackerImpl) GetStartHeight(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) (uint64, error)

GetStartHeight returns the start height to use when searching. Only one of startBlockID and startHeight may be set. Otherwise, an InvalidArgument error is returned. If a block is provided and does not exist, a NotFound error is returned. If neither startBlockID nor startHeight is provided, the latest sealed block is used. If the start block is the root block, skip it and begin from the next block.

Parameters: - ctx: Context for the operation. - startBlockID: The identifier of the starting block. If provided, startHeight should be 0. - startHeight: The height of the starting block. If provided, startBlockID should be flow.ZeroID.

Returns: - uint64: The start height for searching. - error: An error indicating the result of the operation, if any.

Expected errors during normal operation: - codes.InvalidArgument - if both startBlockID and startHeight are provided, if the start height is less than the root block height, if the start height is out of bounds based on indexed heights (when index is used). - codes.NotFound - if a block is provided and does not exist. - codes.Internal - if there is an internal error.

func (*ExecutionDataTrackerImpl) GetStartHeightFromBlockID

func (e *ExecutionDataTrackerImpl) GetStartHeightFromBlockID(startBlockID flow.Identifier) (uint64, error)

GetStartHeightFromBlockID returns the start height based on the provided starting block ID.

Parameters: - startBlockID: The identifier of the starting block.

Returns: - uint64: The start height associated with the provided block ID. - error: An error indicating any issues with retrieving the start height.

Expected errors during normal operation: - codes.NotFound - if the block was not found in storage - codes.InvalidArgument - if the start height is out of bounds based on indexed heights. - codes.FailedPrecondition - if the index reporter is not ready yet. - codes.Internal - for any other error during validation.

func (*ExecutionDataTrackerImpl) GetStartHeightFromHeight

func (e *ExecutionDataTrackerImpl) GetStartHeightFromHeight(startHeight uint64) (uint64, error)

GetStartHeightFromHeight returns the start height based on the provided starting block height.

Parameters: - startHeight: The height of the starting block.

Returns: - uint64: The start height associated with the provided block height. - error: An error indicating any issues with retrieving the start height.

Expected errors during normal operation: - codes.InvalidArgument - if the start height is less than the root block height, if the start height is out of bounds based on indexed heights - codes.NotFound - if the header was not found in storage. - codes.FailedPrecondition - if the index reporter is not ready yet. - codes.Internal - for any other error during validation.

func (*ExecutionDataTrackerImpl) GetStartHeightFromLatest

func (e *ExecutionDataTrackerImpl) GetStartHeightFromLatest(ctx context.Context) (uint64, error)

GetStartHeightFromLatest returns the start height based on the latest sealed block.

Parameters: - ctx: Context for the operation.

Expected errors during normal operation: - codes.InvalidArgument - if the start height is out of bounds based on indexed heights. - codes.FailedPrecondition - if the index reporter is not ready yet. - codes.Internal - for any other error during validation.

func (*ExecutionDataTrackerImpl) OnExecutionData

func (e *ExecutionDataTrackerImpl) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)

OnExecutionData is used to notify the tracker when a new execution data is received.

type GetDataByHeightFunc

type GetDataByHeightFunc func(ctx context.Context, height uint64) (interface{}, error)

GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions

type HeightBasedSubscription

type HeightBasedSubscription struct {
	*SubscriptionImpl
	// contains filtered or unexported fields
}

HeightBasedSubscription is a subscription that retrieves data sequentially by block height

func NewHeightBasedSubscription

func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription

func (*HeightBasedSubscription) Next

func (s *HeightBasedSubscription) Next(ctx context.Context) (interface{}, error)

Next returns the value for the next height from the subscription

type Streamable

type Streamable interface {
	// ID returns the subscription ID
	// Note: this is not a cryptographic hash
	ID() string
	// Close is called when a subscription ends gracefully, and closes the subscription channel
	Close()
	// Fail registers an error and closes the subscription channel
	Fail(error)
	// Send sends a value to the subscription channel or returns an error
	// Expected errors:
	// - context.DeadlineExceeded if send timed out
	// - context.Canceled if the client disconnected
	Send(context.Context, interface{}, time.Duration) error
	// Next returns the value for the next height from the subscription
	Next(context.Context) (interface{}, error)
}

Streamable represents a subscription that can be streamed.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer represents a streaming subscription that delivers data to clients.

func NewStreamer

func NewStreamer(
	log zerolog.Logger,
	broadcaster *engine.Broadcaster,
	sendTimeout time.Duration,
	limit float64,
	sub Streamable,
) *Streamer

NewStreamer creates a new Streamer instance.

func (*Streamer) Stream

func (s *Streamer) Stream(ctx context.Context)

Stream is a blocking method that streams data to the subscription until either the context is cancelled or it encounters an error.

type StreamingData

type StreamingData struct {
	MaxStreams  int32
	StreamCount atomic.Int32
}

StreamingData represents common streaming data configuration for access and state_stream handlers.

func NewStreamingData

func NewStreamingData(maxStreams uint32) StreamingData

type Subscription

type Subscription interface {
	// ID returns the unique identifier for this subscription used for logging
	ID() string

	// Channel returns the channel from which subscription data can be read
	Channel() <-chan interface{}

	// Err returns the error that caused the subscription to fail
	Err() error
}

Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.

type SubscriptionHandler

type SubscriptionHandler struct {
	// contains filtered or unexported fields
}

SubscriptionHandler represents common streaming data configuration for creating streaming subscription.

func NewSubscriptionHandler

func NewSubscriptionHandler(
	log zerolog.Logger,
	broadcaster *engine.Broadcaster,
	sendTimeout time.Duration,
	responseLimit float64,
	sendBufferSize uint,
) *SubscriptionHandler

NewSubscriptionHandler creates a new SubscriptionHandler instance.

Parameters: - log: The logger to use for logging. - broadcaster: The engine broadcaster for publishing notifications. - sendTimeout: The duration after which a send operation will timeout. - responseLimit: The maximum allowed response time for a single stream. - sendBufferSize: The size of the response buffer for sending messages to the client.

Returns a new SubscriptionHandler instance.

func (*SubscriptionHandler) Subscribe

func (h *SubscriptionHandler) Subscribe(
	ctx context.Context,
	startHeight uint64,
	getData GetDataByHeightFunc,
) Subscription

Subscribe creates and starts a new subscription.

Parameters: - ctx: The context for the operation. - startHeight: The height to start subscription from. - getData: The function to retrieve data by height.

type SubscriptionImpl

type SubscriptionImpl struct {
	// contains filtered or unexported fields
}

func NewFailedSubscription

func NewFailedSubscription(err error, msg string) *SubscriptionImpl

NewFailedSubscription returns a new subscription that has already failed with the given error and message. This is useful to return an error that occurred during subscription setup.

func NewSubscription

func NewSubscription(bufferSize int) *SubscriptionImpl

func (*SubscriptionImpl) Channel

func (sub *SubscriptionImpl) Channel() <-chan interface{}

Channel returns the channel from which subscription data can be read

func (*SubscriptionImpl) Close

func (sub *SubscriptionImpl) Close()

Close is called when a subscription ends gracefully, and closes the subscription channel

func (*SubscriptionImpl) Err

func (sub *SubscriptionImpl) Err() error

Err returns the error that caused the subscription to fail

func (*SubscriptionImpl) Fail

func (sub *SubscriptionImpl) Fail(err error)

Fail registers an error and closes the subscription channel

func (*SubscriptionImpl) ID

func (sub *SubscriptionImpl) ID() string

ID returns the subscription ID Note: this is not a cryptographic hash

func (*SubscriptionImpl) Send

func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error

Send sends a value to the subscription channel or returns an error Expected errors: - context.DeadlineExceeded if send timed out - context.Canceled if the client disconnected

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL