Documentation ¶
Index ¶
- type Config
- type Engine
- type EventsBackend
- type EventsResponse
- type ExecutionDataBackend
- type ExecutionDataResponse
- type GetDataByHeightFunc
- type GetExecutionDataFunc
- type GetStartHeightFunc
- type Handler
- func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *executiondata.GetExecutionDataByBlockIDRequest) (*executiondata.GetExecutionDataByBlockIDResponse, error)
- func (h *Handler) GetRegisterValues(_ context.Context, request *executiondata.GetRegisterValuesRequest) (*executiondata.GetRegisterValuesResponse, error)
- func (h *Handler) SubscribeEvents(request *executiondata.SubscribeEventsRequest, ...) error
- func (h *Handler) SubscribeExecutionData(request *executiondata.SubscribeExecutionDataRequest, ...) error
- type HeightBasedSubscription
- type StateStreamBackend
- type Streamer
- type SubscriptionImpl
- func (sub *SubscriptionImpl) Channel() <-chan interface{}
- func (sub *SubscriptionImpl) Close()
- func (sub *SubscriptionImpl) Err() error
- func (sub *SubscriptionImpl) Fail(err error)
- func (sub *SubscriptionImpl) ID() string
- func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { state_stream.EventFilterConfig // ListenAddr is the address the GRPC server will listen on as host:port ListenAddr string // MaxExecutionDataMsgSize is the max message size for block execution data API MaxExecutionDataMsgSize uint // RpcMetricsEnabled specifies whether to enable the GRPC metrics RpcMetricsEnabled bool // MaxGlobalStreams defines the global max number of streams that can be open at the same time. MaxGlobalStreams uint32 // RegisterIDsRequestLimit defines the max number of register IDs that can be received in a single request. RegisterIDsRequestLimit uint32 // ExecutionDataCacheSize is the max number of objects for the execution data cache. ExecutionDataCacheSize uint32 // ClientSendTimeout is the timeout for sending a message to the client. After the timeout, // the stream is closed with an error. ClientSendTimeout time.Duration // ClientSendBufferSize is the size of the response buffer for sending messages to the client. ClientSendBufferSize uint // ResponseLimit is the max responses per second allowed on a stream. After exceeding the limit, // the stream is paused until more capacity is available. Searches of past data can be CPU // intensive, so this helps manage the impact. ResponseLimit float64 // HeartbeatInterval specifies the block interval at which heartbeat messages should be sent. HeartbeatInterval uint64 }
Config defines the configurable options for the ingress server.
type Engine ¶
type Engine struct { *component.ComponentManager // contains filtered or unexported fields }
Engine exposes the server with the state stream API. By default, this engine is not enabled. In order to run this engine a port for the GRPC server to be served on should be specified in the run config.
func NewEng ¶
func NewEng( log zerolog.Logger, config Config, execDataCache *cache.ExecutionDataCache, headers storage.Headers, chainID flow.ChainID, server *grpcserver.GrpcServer, backend *StateStreamBackend, broadcaster *engine.Broadcaster, ) (*Engine, error)
NewEng returns a new ingress server.
func (*Engine) OnExecutionData ¶
func (e *Engine) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)
OnExecutionData is called to notify the engine when a new execution data is received. The caller must guarantee that execution data is locally available for all blocks with heights between the initialBlockHeight provided during startup and the block height of the execution data provided.
type EventsBackend ¶
type EventsBackend struct {
// contains filtered or unexported fields
}
func (EventsBackend) SubscribeEvents ¶
func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) state_stream.Subscription
type EventsResponse ¶
type EventsResponse struct { BlockID flow.Identifier Height uint64 Events flow.EventsList }
type ExecutionDataBackend ¶
type ExecutionDataBackend struct {
// contains filtered or unexported fields
}
func (*ExecutionDataBackend) GetExecutionDataByBlockID ¶
func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)
func (*ExecutionDataBackend) SubscribeExecutionData ¶
func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) state_stream.Subscription
type ExecutionDataResponse ¶
type ExecutionDataResponse struct { Height uint64 ExecutionData *execution_data.BlockExecutionData }
type GetDataByHeightFunc ¶
GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions
type GetExecutionDataFunc ¶
type GetExecutionDataFunc func(context.Context, uint64) (*execution_data.BlockExecutionDataEntity, error)
type GetStartHeightFunc ¶
type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
func NewHandler ¶
func (*Handler) GetExecutionDataByBlockID ¶
func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *executiondata.GetExecutionDataByBlockIDRequest) (*executiondata.GetExecutionDataByBlockIDResponse, error)
func (*Handler) GetRegisterValues ¶
func (h *Handler) GetRegisterValues(_ context.Context, request *executiondata.GetRegisterValuesRequest) (*executiondata.GetRegisterValuesResponse, error)
func (*Handler) SubscribeEvents ¶
func (h *Handler) SubscribeEvents(request *executiondata.SubscribeEventsRequest, stream executiondata.ExecutionDataAPI_SubscribeEventsServer) error
func (*Handler) SubscribeExecutionData ¶
func (h *Handler) SubscribeExecutionData(request *executiondata.SubscribeExecutionDataRequest, stream executiondata.ExecutionDataAPI_SubscribeExecutionDataServer) error
type HeightBasedSubscription ¶
type HeightBasedSubscription struct { *SubscriptionImpl // contains filtered or unexported fields }
HeightBasedSubscription is a subscription that retrieves data sequentially by block height
func NewHeightBasedSubscription ¶
func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription
type StateStreamBackend ¶
type StateStreamBackend struct { ExecutionDataBackend EventsBackend // contains filtered or unexported fields }
func New ¶
func New( log zerolog.Logger, config Config, state protocol.State, headers storage.Headers, seals storage.Seals, results storage.ExecutionResults, execDataStore execution_data.ExecutionDataStore, execDataCache *cache.ExecutionDataCache, broadcaster *engine.Broadcaster, rootHeight uint64, highestAvailableHeight uint64, registers *execution.RegistersAsyncStore, ) (*StateStreamBackend, error)
func (*StateStreamBackend) GetRegisterValues ¶ added in v0.32.10
func (b *StateStreamBackend) GetRegisterValues(ids flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error)
GetRegisterValues returns the register values for the given register IDs at the given block height.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer
func NewStreamer ¶
func NewStreamer( log zerolog.Logger, broadcaster *engine.Broadcaster, sendTimeout time.Duration, limit float64, sub state_stream.Streamable, ) *Streamer
type SubscriptionImpl ¶
type SubscriptionImpl struct {
// contains filtered or unexported fields
}
func NewFailedSubscription ¶
func NewFailedSubscription(err error, msg string) *SubscriptionImpl
NewFailedSubscription returns a new subscription that has already failed with the given error and message. This is useful to return an error that occurred during subscription setup.
func NewSubscription ¶
func NewSubscription(bufferSize int) *SubscriptionImpl
func (*SubscriptionImpl) Channel ¶
func (sub *SubscriptionImpl) Channel() <-chan interface{}
Channel returns the channel from which subscription data can be read
func (*SubscriptionImpl) Close ¶
func (sub *SubscriptionImpl) Close()
Close is called when a subscription ends gracefully, and closes the subscription channel
func (*SubscriptionImpl) Err ¶
func (sub *SubscriptionImpl) Err() error
Err returns the error that caused the subscription to fail
func (*SubscriptionImpl) Fail ¶
func (sub *SubscriptionImpl) Fail(err error)
Fail registers an error and closes the subscription channel
func (*SubscriptionImpl) ID ¶
func (sub *SubscriptionImpl) ID() string
ID returns the subscription ID Note: this is not a cryptographic hash