Documentation ¶
Index ¶
- Constants
- Variables
- type API
- type Config
- type Engine
- type EventFilter
- type EventFilterConfig
- type EventsBackend
- type EventsResponse
- type ExecutionDataBackend
- type ExecutionDataResponse
- type GetDataByHeightFunc
- type GetExecutionDataFunc
- type GetStartHeightFunc
- type Handler
- func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *access.GetExecutionDataByBlockIDRequest) (*access.GetExecutionDataByBlockIDResponse, error)
- func (h *Handler) SubscribeEvents(request *access.SubscribeEventsRequest, ...) error
- func (h *Handler) SubscribeExecutionData(request *access.SubscribeExecutionDataRequest, ...) error
- type HeightBasedSubscription
- type ParsedEvent
- type ParsedEventType
- type StateStreamBackend
- type Streamable
- type Streamer
- type Subscription
- type SubscriptionImpl
- func (sub *SubscriptionImpl) Channel() <-chan interface{}
- func (sub *SubscriptionImpl) Close()
- func (sub *SubscriptionImpl) Err() error
- func (sub *SubscriptionImpl) Fail(err error)
- func (sub *SubscriptionImpl) ID() string
- func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error
Constants ¶
const ( // DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time. DefaultMaxGlobalStreams = 1000 // DefaultCacheSize defines the default max number of objects for the execution data cache. DefaultCacheSize = 100 // DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout // expires, the connection is closed. DefaultSendTimeout = 30 * time.Second // DefaultResponseLimit is default max responses per second allowed on a stream. After exceeding // the limit, the stream is paused until more capacity is available. DefaultResponseLimit = float64(0) )
const ( // DefaultMaxEventTypes is the default maximum number of event types that can be specified in a filter DefaultMaxEventTypes = 1000 // DefaultMaxAddresses is the default maximum number of addresses that can be specified in a filter DefaultMaxAddresses = 1000 // DefaultMaxContracts is the default maximum number of contracts that can be specified in a filter DefaultMaxContracts = 1000 )
const DefaultSendBufferSize = 10
DefaultSendBufferSize is the default buffer size for the subscription's send channel. The size is chosen to balance memory overhead from each subscription with performance when streaming existing data.
Variables ¶
var DefaultEventFilterConfig = EventFilterConfig{ MaxEventTypes: DefaultMaxEventTypes, MaxAddresses: DefaultMaxAddresses, MaxContracts: DefaultMaxContracts, }
DefaultEventFilterConfig is the default configuration for EventFilters
Functions ¶
This section is empty.
Types ¶
type API ¶
type API interface { GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startBlockHeight uint64) Subscription SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription }
type Config ¶
type Config struct { EventFilterConfig // ListenAddr is the address the GRPC server will listen on as host:port ListenAddr string // MaxExecutionDataMsgSize is the max message size for block execution data API MaxExecutionDataMsgSize uint // RpcMetricsEnabled specifies whether to enable the GRPC metrics RpcMetricsEnabled bool // MaxGlobalStreams defines the global max number of streams that can be open at the same time. MaxGlobalStreams uint32 // ExecutionDataCacheSize is the max number of objects for the execution data cache. ExecutionDataCacheSize uint32 // ClientSendTimeout is the timeout for sending a message to the client. After the timeout, // the stream is closed with an error. ClientSendTimeout time.Duration // ClientSendBufferSize is the size of the response buffer for sending messages to the client. ClientSendBufferSize uint // ResponseLimit is the max responses per second allowed on a stream. After exceeding the limit, // the stream is paused until more capacity is available. Searches of past data can be CPU // intensive, so this helps manage the impact. ResponseLimit float64 }
Config defines the configurable options for the ingress server.
type Engine ¶
type Engine struct { *component.ComponentManager // contains filtered or unexported fields }
Engine exposes the server with the state stream API. By default, this engine is not enabled. In order to run this engine a port for the GRPC server to be served on should be specified in the run config.
func NewEng ¶
func NewEng( log zerolog.Logger, config Config, execDataStore execution_data.ExecutionDataStore, execDataCache *cache.ExecutionDataCache, state protocol.State, headers storage.Headers, seals storage.Seals, results storage.ExecutionResults, chainID flow.ChainID, initialBlockHeight uint64, highestBlockHeight uint64, apiRatelimits map[string]int, apiBurstLimits map[string]int, ) (*Engine, error)
NewEng returns a new ingress server.
func (*Engine) OnExecutionData ¶ added in v0.30.2
func (e *Engine) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)
OnExecutionData is called to notify the engine when a new execution data is received. The caller must guarantee that execution data is locally available for all blocks with heights between the initialBlockHeight provided during startup and the block height of the execution data provided.
type EventFilter ¶ added in v0.30.2
type EventFilter struct { EventTypes map[flow.EventType]struct{} Addresses map[string]struct{} Contracts map[string]struct{} // contains filtered or unexported fields }
EventFilter represents a filter applied to events for a given subscription
func NewEventFilter ¶ added in v0.30.2
func NewEventFilter( config EventFilterConfig, chain flow.Chain, eventTypes []string, addresses []string, contracts []string, ) (EventFilter, error)
func (*EventFilter) Filter ¶ added in v0.30.2
func (f *EventFilter) Filter(events flow.EventsList) flow.EventsList
Filter applies the all filters on the provided list of events, and returns a list of events that match
type EventFilterConfig ¶ added in v0.30.2
EventFilterConfig is used to configure the limits for EventFilters
type EventsBackend ¶ added in v0.30.2
type EventsBackend struct {
// contains filtered or unexported fields
}
func (EventsBackend) SubscribeEvents ¶ added in v0.30.2
func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription
type EventsResponse ¶ added in v0.30.2
type EventsResponse struct { BlockID flow.Identifier Height uint64 Events flow.EventsList }
type ExecutionDataBackend ¶ added in v0.30.2
type ExecutionDataBackend struct {
// contains filtered or unexported fields
}
func (*ExecutionDataBackend) GetExecutionDataByBlockID ¶ added in v0.30.2
func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)
func (*ExecutionDataBackend) SubscribeExecutionData ¶ added in v0.30.2
func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) Subscription
type ExecutionDataResponse ¶ added in v0.30.2
type ExecutionDataResponse struct { Height uint64 ExecutionData *execution_data.BlockExecutionData }
type GetDataByHeightFunc ¶ added in v0.30.2
GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions
type GetExecutionDataFunc ¶ added in v0.30.2
type GetExecutionDataFunc func(context.Context, uint64) (*execution_data.BlockExecutionDataEntity, error)
type GetStartHeightFunc ¶ added in v0.30.2
type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
func NewHandler ¶
func (*Handler) GetExecutionDataByBlockID ¶
func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *access.GetExecutionDataByBlockIDRequest) (*access.GetExecutionDataByBlockIDResponse, error)
func (*Handler) SubscribeEvents ¶ added in v0.30.2
func (h *Handler) SubscribeEvents(request *access.SubscribeEventsRequest, stream access.ExecutionDataAPI_SubscribeEventsServer) error
func (*Handler) SubscribeExecutionData ¶ added in v0.30.2
func (h *Handler) SubscribeExecutionData(request *access.SubscribeExecutionDataRequest, stream access.ExecutionDataAPI_SubscribeExecutionDataServer) error
type HeightBasedSubscription ¶ added in v0.30.2
type HeightBasedSubscription struct { *SubscriptionImpl // contains filtered or unexported fields }
HeightBasedSubscription is a subscription that retrieves data sequentially by block height
func NewHeightBasedSubscription ¶ added in v0.30.2
func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription
type ParsedEvent ¶ added in v0.30.2
type ParsedEvent struct { Type ParsedEventType EventType flow.EventType Address string Contract string ContractName string Name string }
func ParseEvent ¶ added in v0.30.2
func ParseEvent(eventType flow.EventType) (*ParsedEvent, error)
ParseEvent parses an event type into its parts. There are 2 valid EventType formats: - flow.[EventName] - A.[Address].[Contract].[EventName] Any other format results in an error.
type ParsedEventType ¶ added in v0.30.2
type ParsedEventType int
const ( ProtocolEventType ParsedEventType = iota + 1 AccountEventType )
type StateStreamBackend ¶
type StateStreamBackend struct { ExecutionDataBackend EventsBackend // contains filtered or unexported fields }
func New ¶
func New( log zerolog.Logger, config Config, state protocol.State, headers storage.Headers, seals storage.Seals, results storage.ExecutionResults, execDataStore execution_data.ExecutionDataStore, execDataCache *cache.ExecutionDataCache, broadcaster *engine.Broadcaster, rootHeight uint64, highestAvailableHeight uint64, ) (*StateStreamBackend, error)
type Streamable ¶ added in v0.30.2
type Streamable interface { ID() string Close() Fail(error) Send(context.Context, interface{}, time.Duration) error Next(context.Context) (interface{}, error) }
Streamable represents a subscription that can be streamed.
type Streamer ¶ added in v0.30.2
type Streamer struct {
// contains filtered or unexported fields
}
Streamer
func NewStreamer ¶ added in v0.30.2
func NewStreamer( log zerolog.Logger, broadcaster *engine.Broadcaster, sendTimeout time.Duration, limit float64, sub Streamable, ) *Streamer
type Subscription ¶ added in v0.30.2
type Subscription interface { // ID returns the unique identifier for this subscription used for logging ID() string // Channel returns the channel from which subscription data can be read Channel() <-chan interface{} // Err returns the error that caused the subscription to fail Err() error }
Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.
type SubscriptionImpl ¶ added in v0.30.2
type SubscriptionImpl struct {
// contains filtered or unexported fields
}
func NewFailedSubscription ¶ added in v0.31.0
func NewFailedSubscription(err error, msg string) *SubscriptionImpl
NewFailedSubscription returns a new subscription that has already failed with the given error and message. This is useful to return an error that occurred during subscription setup.
func NewSubscription ¶ added in v0.30.2
func NewSubscription(bufferSize int) *SubscriptionImpl
func (*SubscriptionImpl) Channel ¶ added in v0.30.2
func (sub *SubscriptionImpl) Channel() <-chan interface{}
Channel returns the channel from which subscription data can be read
func (*SubscriptionImpl) Close ¶ added in v0.30.2
func (sub *SubscriptionImpl) Close()
Close is called when a subscription ends gracefully, and closes the subscription channel
func (*SubscriptionImpl) Err ¶ added in v0.30.2
func (sub *SubscriptionImpl) Err() error
Err returns the error that caused the subscription to fail
func (*SubscriptionImpl) Fail ¶ added in v0.30.2
func (sub *SubscriptionImpl) Fail(err error)
Fail registers an error and closes the subscription channel
func (*SubscriptionImpl) ID ¶ added in v0.30.2
func (sub *SubscriptionImpl) ID() string
ID returns the subscription ID Note: this is not a cryptographic hash