state_stream

package
v0.32.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2023 License: AGPL-3.0 Imports: 27 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time.
	DefaultMaxGlobalStreams = 1000

	// DefaultCacheSize defines the default max number of objects for the execution data cache.
	DefaultCacheSize = 100

	// DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout
	// expires, the connection is closed.
	DefaultSendTimeout = 30 * time.Second

	// DefaultResponseLimit is default max responses per second allowed on a stream. After exceeding
	// the limit, the stream is paused until more capacity is available.
	DefaultResponseLimit = float64(0)
)
View Source
const (
	// DefaultMaxEventTypes is the default maximum number of event types that can be specified in a filter
	DefaultMaxEventTypes = 1000

	// DefaultMaxAddresses is the default maximum number of addresses that can be specified in a filter
	DefaultMaxAddresses = 1000

	// DefaultMaxContracts is the default maximum number of contracts that can be specified in a filter
	DefaultMaxContracts = 1000
)
View Source
const DefaultSendBufferSize = 10

DefaultSendBufferSize is the default buffer size for the subscription's send channel. The size is chosen to balance memory overhead from each subscription with performance when streaming existing data.

Variables

View Source
var DefaultEventFilterConfig = EventFilterConfig{
	MaxEventTypes: DefaultMaxEventTypes,
	MaxAddresses:  DefaultMaxAddresses,
	MaxContracts:  DefaultMaxContracts,
}

DefaultEventFilterConfig is the default configuration for EventFilters

Functions

This section is empty.

Types

type API

type API interface {
	GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)
	SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startBlockHeight uint64) Subscription
	SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription
}

type Config

type Config struct {
	EventFilterConfig

	// ListenAddr is the address the GRPC server will listen on as host:port
	ListenAddr string

	// MaxExecutionDataMsgSize is the max message size for block execution data API
	MaxExecutionDataMsgSize uint

	// RpcMetricsEnabled specifies whether to enable the GRPC metrics
	RpcMetricsEnabled bool

	// MaxGlobalStreams defines the global max number of streams that can be open at the same time.
	MaxGlobalStreams uint32

	// ExecutionDataCacheSize is the max number of objects for the execution data cache.
	ExecutionDataCacheSize uint32

	// ClientSendTimeout is the timeout for sending a message to the client. After the timeout,
	// the stream is closed with an error.
	ClientSendTimeout time.Duration

	// ClientSendBufferSize is the size of the response buffer for sending messages to the client.
	ClientSendBufferSize uint

	// ResponseLimit is the max responses per second allowed on a stream. After exceeding the limit,
	// the stream is paused until more capacity is available. Searches of past data can be CPU
	// intensive, so this helps manage the impact.
	ResponseLimit float64
}

Config defines the configurable options for the ingress server.

type Engine

type Engine struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}

Engine exposes the server with the state stream API. By default, this engine is not enabled. In order to run this engine a port for the GRPC server to be served on should be specified in the run config.

func NewEng

func NewEng(
	log zerolog.Logger,
	config Config,
	execDataCache *cache.ExecutionDataCache,
	headers storage.Headers,
	chainID flow.ChainID,
	server *grpcserver.GrpcServer,
	backend *StateStreamBackend,
	broadcaster *engine.Broadcaster,
) (*Engine, error)

NewEng returns a new ingress server.

func (*Engine) OnExecutionData added in v0.30.2

func (e *Engine) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)

OnExecutionData is called to notify the engine when a new execution data is received. The caller must guarantee that execution data is locally available for all blocks with heights between the initialBlockHeight provided during startup and the block height of the execution data provided.

type EventFilter added in v0.30.2

type EventFilter struct {
	EventTypes map[flow.EventType]struct{}
	Addresses  map[string]struct{}
	Contracts  map[string]struct{}
	// contains filtered or unexported fields
}

EventFilter represents a filter applied to events for a given subscription

func NewEventFilter added in v0.30.2

func NewEventFilter(
	config EventFilterConfig,
	chain flow.Chain,
	eventTypes []string,
	addresses []string,
	contracts []string,
) (EventFilter, error)

func (*EventFilter) Filter added in v0.30.2

func (f *EventFilter) Filter(events flow.EventsList) flow.EventsList

Filter applies the all filters on the provided list of events, and returns a list of events that match

func (*EventFilter) Match added in v0.30.2

func (f *EventFilter) Match(event flow.Event) bool

Match applies all filters to a specific event, and returns true if the event matches

type EventFilterConfig added in v0.30.2

type EventFilterConfig struct {
	MaxEventTypes int
	MaxAddresses  int
	MaxContracts  int
}

EventFilterConfig is used to configure the limits for EventFilters

type EventsBackend added in v0.30.2

type EventsBackend struct {
	// contains filtered or unexported fields
}

func (EventsBackend) SubscribeEvents added in v0.30.2

func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription

type EventsResponse added in v0.30.2

type EventsResponse struct {
	BlockID flow.Identifier
	Height  uint64
	Events  flow.EventsList
}

type ExecutionDataBackend added in v0.30.2

type ExecutionDataBackend struct {
	// contains filtered or unexported fields
}

func (*ExecutionDataBackend) GetExecutionDataByBlockID added in v0.30.2

func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)

func (*ExecutionDataBackend) SubscribeExecutionData added in v0.30.2

func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) Subscription

type ExecutionDataResponse added in v0.30.2

type ExecutionDataResponse struct {
	Height        uint64
	ExecutionData *execution_data.BlockExecutionData
}

type GetDataByHeightFunc added in v0.30.2

type GetDataByHeightFunc func(ctx context.Context, height uint64) (interface{}, error)

GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions

type GetExecutionDataFunc added in v0.30.2

type GetExecutionDataFunc func(context.Context, uint64) (*execution_data.BlockExecutionDataEntity, error)

type GetStartHeightFunc added in v0.30.2

type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func NewHandler

func NewHandler(api API, chain flow.Chain, conf EventFilterConfig, maxGlobalStreams uint32) *Handler

func (*Handler) GetRegisterValues added in v0.32.2

func (*Handler) SubscribeEvents added in v0.30.2

func (*Handler) SubscribeExecutionData added in v0.30.2

type HeightBasedSubscription added in v0.30.2

type HeightBasedSubscription struct {
	*SubscriptionImpl
	// contains filtered or unexported fields
}

HeightBasedSubscription is a subscription that retrieves data sequentially by block height

func NewHeightBasedSubscription added in v0.30.2

func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription

func (*HeightBasedSubscription) Next added in v0.30.2

func (s *HeightBasedSubscription) Next(ctx context.Context) (interface{}, error)

Next returns the value for the next height from the subscription

type ParsedEvent added in v0.30.2

type ParsedEvent struct {
	Type         ParsedEventType
	EventType    flow.EventType
	Address      string
	Contract     string
	ContractName string
	Name         string
}

func ParseEvent added in v0.30.2

func ParseEvent(eventType flow.EventType) (*ParsedEvent, error)

ParseEvent parses an event type into its parts. There are 2 valid EventType formats: - flow.[EventName] - A.[Address].[Contract].[EventName] Any other format results in an error.

type ParsedEventType added in v0.30.2

type ParsedEventType int
const (
	ProtocolEventType ParsedEventType = iota + 1
	AccountEventType
)

type StateStreamBackend

type StateStreamBackend struct {
	ExecutionDataBackend
	EventsBackend
	// contains filtered or unexported fields
}

func New

func New(
	log zerolog.Logger,
	config Config,
	state protocol.State,
	headers storage.Headers,
	seals storage.Seals,
	results storage.ExecutionResults,
	execDataStore execution_data.ExecutionDataStore,
	execDataCache *cache.ExecutionDataCache,
	broadcaster *engine.Broadcaster,
	rootHeight uint64,
	highestAvailableHeight uint64,
) (*StateStreamBackend, error)

type Streamable added in v0.30.2

type Streamable interface {
	ID() string
	Close()
	Fail(error)
	Send(context.Context, interface{}, time.Duration) error
	Next(context.Context) (interface{}, error)
}

Streamable represents a subscription that can be streamed.

type Streamer added in v0.30.2

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer

func NewStreamer added in v0.30.2

func NewStreamer(
	log zerolog.Logger,
	broadcaster *engine.Broadcaster,
	sendTimeout time.Duration,
	limit float64,
	sub Streamable,
) *Streamer

func (*Streamer) Stream added in v0.30.2

func (s *Streamer) Stream(ctx context.Context)

Stream is a blocking method that streams data to the subscription until either the context is cancelled or it encounters an error.

type Subscription added in v0.30.2

type Subscription interface {
	// ID returns the unique identifier for this subscription used for logging
	ID() string

	// Channel returns the channel from which subscription data can be read
	Channel() <-chan interface{}

	// Err returns the error that caused the subscription to fail
	Err() error
}

Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.

type SubscriptionImpl added in v0.30.2

type SubscriptionImpl struct {
	// contains filtered or unexported fields
}

func NewFailedSubscription added in v0.31.0

func NewFailedSubscription(err error, msg string) *SubscriptionImpl

NewFailedSubscription returns a new subscription that has already failed with the given error and message. This is useful to return an error that occurred during subscription setup.

func NewSubscription added in v0.30.2

func NewSubscription(bufferSize int) *SubscriptionImpl

func (*SubscriptionImpl) Channel added in v0.30.2

func (sub *SubscriptionImpl) Channel() <-chan interface{}

Channel returns the channel from which subscription data can be read

func (*SubscriptionImpl) Close added in v0.30.2

func (sub *SubscriptionImpl) Close()

Close is called when a subscription ends gracefully, and closes the subscription channel

func (*SubscriptionImpl) Err added in v0.30.2

func (sub *SubscriptionImpl) Err() error

Err returns the error that caused the subscription to fail

func (*SubscriptionImpl) Fail added in v0.30.2

func (sub *SubscriptionImpl) Fail(err error)

Fail registers an error and closes the subscription channel

func (*SubscriptionImpl) ID added in v0.30.2

func (sub *SubscriptionImpl) ID() string

ID returns the subscription ID Note: this is not a cryptographic hash

func (*SubscriptionImpl) Send added in v0.30.2

func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error

Send sends a value to the subscription channel or returns an error Expected errors: - context.DeadlineExceeded if send timed out - context.Canceled if the client disconnected

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL