Documentation ¶
Index ¶
- Constants
- Variables
- type API
- type Config
- type Engine
- type EventFilter
- type EventFilterConfig
- type EventsBackend
- type EventsResponse
- type ExecutionDataBackend
- type ExecutionDataResponse
- type GetDataByHeightFunc
- type GetExecutionDataFunc
- type GetStartHeightFunc
- type Handler
- func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *access.GetExecutionDataByBlockIDRequest) (*access.GetExecutionDataByBlockIDResponse, error)
- func (h *Handler) SubscribeEvents(request *access.SubscribeEventsRequest, ...) error
- func (h *Handler) SubscribeExecutionData(request *access.SubscribeExecutionDataRequest, ...) error
- type HeightBasedSubscription
- type ParsedEvent
- type ParsedEventType
- type StateStreamBackend
- type Streamable
- type Streamer
- type Subscription
- type SubscriptionImpl
- func (sub *SubscriptionImpl) Channel() <-chan interface{}
- func (sub *SubscriptionImpl) Close()
- func (sub *SubscriptionImpl) Err() error
- func (sub *SubscriptionImpl) Fail(err error)
- func (sub *SubscriptionImpl) ID() string
- func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error
Constants ¶
const ( // DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time. DefaultMaxGlobalStreams = 1000 // DefaultCacheSize defines the default max number of objects for the execution data cache. DefaultCacheSize = 100 // DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout // expires, the connection is closed. DefaultSendTimeout = 30 * time.Second )
const ( // DefaultMaxEventTypes is the default maximum number of event types that can be specified in a filter DefaultMaxEventTypes = 1000 // DefaultMaxAddresses is the default maximum number of addresses that can be specified in a filter DefaultMaxAddresses = 1000 // DefaultMaxContracts is the default maximum number of contracts that can be specified in a filter DefaultMaxContracts = 1000 )
const DefaultSendBufferSize = 10
DefaultSendBufferSize is the default buffer size for the subscription's send channel. The size is chosen to balance memory overhead from each subscription with performance when streaming existing data.
Variables ¶
var DefaultEventFilterConfig = EventFilterConfig{ MaxEventTypes: DefaultMaxEventTypes, MaxAddresses: DefaultMaxAddresses, MaxContracts: DefaultMaxContracts, }
DefaultEventFilterConfig is the default configuration for EventFilters
Functions ¶
This section is empty.
Types ¶
type API ¶
type API interface { GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startBlockHeight uint64) Subscription SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription }
type Config ¶
type Config struct { EventFilterConfig // ListenAddr is the address the GRPC server will listen on as host:port ListenAddr string // MaxExecutionDataMsgSize is the max message size for block execution data API MaxExecutionDataMsgSize uint // RpcMetricsEnabled specifies whether to enable the GRPC metrics RpcMetricsEnabled bool // MaxGlobalStreams defines the global max number of streams that can be open at the same time. MaxGlobalStreams uint32 // ExecutionDataCacheSize is the max number of objects for the execution data cache. ExecutionDataCacheSize uint32 // ClientSendTimeout is the timeout for sending a message to the client. After the timeout, // the stream is closed with an error. ClientSendTimeout time.Duration // ClientSendBufferSize is the size of the response buffer for sending messages to the client. ClientSendBufferSize uint }
Config defines the configurable options for the ingress server.
type Engine ¶
type Engine struct { *component.ComponentManager // contains filtered or unexported fields }
Engine exposes the server with the state stream API. By default, this engine is not enabled. In order to run this engine a port for the GRPC server to be served on should be specified in the run config.
func NewEng ¶
func NewEng( log zerolog.Logger, config Config, execDataStore execution_data.ExecutionDataStore, state protocol.State, headers storage.Headers, seals storage.Seals, results storage.ExecutionResults, chainID flow.ChainID, apiRatelimits map[string]int, apiBurstLimits map[string]int, heroCacheMetrics module.HeroCacheMetrics, ) (*Engine, error)
NewEng returns a new ingress server.
func (*Engine) OnExecutionData ¶ added in v0.30.2
func (e *Engine) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)
OnExecutionData is called to notify the engine when a new execution data is received.
type EventFilter ¶ added in v0.30.2
type EventFilter struct { EventTypes map[flow.EventType]struct{} Addresses map[string]struct{} Contracts map[string]struct{} // contains filtered or unexported fields }
EventFilter represents a filter applied to events for a given subscription
func NewEventFilter ¶ added in v0.30.2
func NewEventFilter( config EventFilterConfig, chain flow.Chain, eventTypes []string, addresses []string, contracts []string, ) (EventFilter, error)
func (*EventFilter) Filter ¶ added in v0.30.2
func (f *EventFilter) Filter(events flow.EventsList) flow.EventsList
Filter applies the all filters on the provided list of events, and returns a list of events that match
type EventFilterConfig ¶ added in v0.30.2
EventFilterConfig is used to configure the limits for EventFilters
type EventsBackend ¶ added in v0.30.2
type EventsBackend struct {
// contains filtered or unexported fields
}
func (EventsBackend) SubscribeEvents ¶ added in v0.30.2
func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription
type EventsResponse ¶ added in v0.30.2
type EventsResponse struct { BlockID flow.Identifier Height uint64 Events flow.EventsList }
type ExecutionDataBackend ¶ added in v0.30.2
type ExecutionDataBackend struct {
// contains filtered or unexported fields
}
func (*ExecutionDataBackend) GetExecutionDataByBlockID ¶ added in v0.30.2
func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)
func (*ExecutionDataBackend) SubscribeExecutionData ¶ added in v0.30.2
func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) Subscription
type ExecutionDataResponse ¶ added in v0.30.2
type ExecutionDataResponse struct { Height uint64 ExecutionData *execution_data.BlockExecutionData }
type GetDataByHeightFunc ¶ added in v0.30.2
GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions
type GetExecutionDataFunc ¶ added in v0.30.2
type GetExecutionDataFunc func(context.Context, flow.Identifier) (*execution_data.BlockExecutionDataEntity, error)
type GetStartHeightFunc ¶ added in v0.30.2
type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
func NewHandler ¶
func (*Handler) GetExecutionDataByBlockID ¶
func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *access.GetExecutionDataByBlockIDRequest) (*access.GetExecutionDataByBlockIDResponse, error)
func (*Handler) SubscribeEvents ¶ added in v0.30.2
func (h *Handler) SubscribeEvents(request *access.SubscribeEventsRequest, stream access.ExecutionDataAPI_SubscribeEventsServer) error
func (*Handler) SubscribeExecutionData ¶ added in v0.30.2
func (h *Handler) SubscribeExecutionData(request *access.SubscribeExecutionDataRequest, stream access.ExecutionDataAPI_SubscribeExecutionDataServer) error
type HeightBasedSubscription ¶ added in v0.30.2
type HeightBasedSubscription struct { *SubscriptionImpl // contains filtered or unexported fields }
HeightBasedSubscription is a subscription that retrieves data sequentially by block height
func NewHeightBasedSubscription ¶ added in v0.30.2
func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription
type ParsedEvent ¶ added in v0.30.2
type ParsedEvent struct { Type ParsedEventType EventType flow.EventType Address string Contract string ContractName string Name string }
func ParseEvent ¶ added in v0.30.2
func ParseEvent(eventType flow.EventType) (*ParsedEvent, error)
ParseEvent parses an event type into its parts. There are 2 valid EventType formats: - flow.[EventName] - A.[Address].[Contract].[EventName] Any other format results in an error.
type ParsedEventType ¶ added in v0.30.2
type ParsedEventType int
const ( ProtocolEventType ParsedEventType = iota + 1 AccountEventType )
type StateStreamBackend ¶
type StateStreamBackend struct { ExecutionDataBackend EventsBackend // contains filtered or unexported fields }
func New ¶
func New( log zerolog.Logger, config Config, state protocol.State, headers storage.Headers, seals storage.Seals, results storage.ExecutionResults, execDataStore execution_data.ExecutionDataStore, execDataCache *herocache.BlockExecutionData, broadcaster *engine.Broadcaster, ) (*StateStreamBackend, error)
type Streamable ¶ added in v0.30.2
type Streamable interface { ID() string Close() Fail(error) Send(context.Context, interface{}, time.Duration) error Next(context.Context) (interface{}, error) }
Streamable represents a subscription that can be streamed.
type Streamer ¶ added in v0.30.2
type Streamer struct {
// contains filtered or unexported fields
}
Streamer
func NewStreamer ¶ added in v0.30.2
func NewStreamer( log zerolog.Logger, broadcaster *engine.Broadcaster, sendTimeout time.Duration, sub Streamable, ) *Streamer
type Subscription ¶ added in v0.30.2
type Subscription interface { // ID returns the unique identifier for this subscription used for logging ID() string // Channel returns the channel from which subscriptino data can be read Channel() <-chan interface{} // Err returns the error that caused the subscription to fail Err() error }
Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.
type SubscriptionImpl ¶ added in v0.30.2
type SubscriptionImpl struct {
// contains filtered or unexported fields
}
func NewSubscription ¶ added in v0.30.2
func NewSubscription(bufferSize int) *SubscriptionImpl
func (*SubscriptionImpl) Channel ¶ added in v0.30.2
func (sub *SubscriptionImpl) Channel() <-chan interface{}
Channel returns the channel from which subscriptino data can be read
func (*SubscriptionImpl) Close ¶ added in v0.30.2
func (sub *SubscriptionImpl) Close()
Close is called when a subscription ends gracefully, and closes the subscription channel
func (*SubscriptionImpl) Err ¶ added in v0.30.2
func (sub *SubscriptionImpl) Err() error
Err returns the error that caused the subscription to fail
func (*SubscriptionImpl) Fail ¶ added in v0.30.2
func (sub *SubscriptionImpl) Fail(err error)
Fail registers an error and closes the subscription channel
func (*SubscriptionImpl) ID ¶ added in v0.30.2
func (sub *SubscriptionImpl) ID() string
ID returns the subscription ID Note: this is not a cryptographic hash