backend

package
v0.33.17-new-ingestion... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: AGPL-3.0 Imports: 32 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	state_stream.EventFilterConfig

	// ListenAddr is the address the GRPC server will listen on as host:port
	ListenAddr string

	// MaxExecutionDataMsgSize is the max message size for block execution data API
	MaxExecutionDataMsgSize uint

	// RpcMetricsEnabled specifies whether to enable the GRPC metrics
	RpcMetricsEnabled bool

	// MaxGlobalStreams defines the global max number of streams that can be open at the same time.
	MaxGlobalStreams uint32

	// RegisterIDsRequestLimit defines the max number of register IDs that can be received in a single request.
	RegisterIDsRequestLimit uint32

	// ExecutionDataCacheSize is the max number of objects for the execution data cache.
	ExecutionDataCacheSize uint32

	// ClientSendTimeout is the timeout for sending a message to the client. After the timeout,
	// the stream is closed with an error.
	ClientSendTimeout time.Duration

	// ClientSendBufferSize is the size of the response buffer for sending messages to the client.
	ClientSendBufferSize uint

	// ResponseLimit is the max responses per second allowed on a stream. After exceeding the limit,
	// the stream is paused until more capacity is available. Searches of past data can be CPU
	// intensive, so this helps manage the impact.
	ResponseLimit float64

	// HeartbeatInterval specifies the block interval at which heartbeat messages should be sent.
	HeartbeatInterval uint64
}

Config defines the configurable options for the ingress server.

type Engine

type Engine struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}

Engine exposes the server with the state stream API. By default, this engine is not enabled. In order to run this engine a port for the GRPC server to be served on should be specified in the run config.

func NewEng

func NewEng(
	log zerolog.Logger,
	config Config,
	execDataCache *cache.ExecutionDataCache,
	headers storage.Headers,
	chainID flow.ChainID,
	server *grpcserver.GrpcServer,
	backend *StateStreamBackend,
	broadcaster *engine.Broadcaster,
) (*Engine, error)

NewEng returns a new ingress server.

func (*Engine) OnExecutionData

func (e *Engine) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity)

OnExecutionData is called to notify the engine when a new execution data is received. The caller must guarantee that execution data is locally available for all blocks with heights between the initialBlockHeight provided during startup and the block height of the execution data provided.

type EventsBackend

type EventsBackend struct {
	// contains filtered or unexported fields
}

func (*EventsBackend) SubscribeEvents

func (b *EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) state_stream.Subscription

type EventsResponse

type EventsResponse struct {
	BlockID flow.Identifier
	Height  uint64
	Events  flow.EventsList
}

type ExecutionDataBackend

type ExecutionDataBackend struct {
	// contains filtered or unexported fields
}

func (*ExecutionDataBackend) GetExecutionDataByBlockID

func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error)

func (*ExecutionDataBackend) SubscribeExecutionData

func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) state_stream.Subscription

type ExecutionDataResponse

type ExecutionDataResponse struct {
	Height        uint64
	ExecutionData *execution_data.BlockExecutionData
}

type GetDataByHeightFunc

type GetDataByHeightFunc func(ctx context.Context, height uint64) (interface{}, error)

GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions

type GetStartHeightFunc

type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func NewHandler

func NewHandler(api state_stream.API, chain flow.Chain, config Config) *Handler

type HeightBasedSubscription

type HeightBasedSubscription struct {
	*SubscriptionImpl
	// contains filtered or unexported fields
}

HeightBasedSubscription is a subscription that retrieves data sequentially by block height

func NewHeightBasedSubscription

func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription

func (*HeightBasedSubscription) Next

func (s *HeightBasedSubscription) Next(ctx context.Context) (interface{}, error)

Next returns the value for the next height from the subscription

type StateStreamBackend

type StateStreamBackend struct {
	ExecutionDataBackend
	EventsBackend
	// contains filtered or unexported fields
}

func New

func New(
	log zerolog.Logger,
	config Config,
	state protocol.State,
	headers storage.Headers,
	seals storage.Seals,
	results storage.ExecutionResults,
	execDataStore execution_data.ExecutionDataStore,
	execDataCache *cache.ExecutionDataCache,
	broadcaster *engine.Broadcaster,
	rootHeight uint64,
	highestAvailableHeight uint64,
	registers *execution.RegistersAsyncStore,
	eventsIndex *backend.EventsIndex,
	useEventsIndex bool,
) (*StateStreamBackend, error)

func (*StateStreamBackend) GetRegisterValues added in v0.32.10

func (b *StateStreamBackend) GetRegisterValues(ids flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error)

GetRegisterValues returns the register values for the given register IDs at the given block height.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer

func NewStreamer

func NewStreamer(
	log zerolog.Logger,
	broadcaster *engine.Broadcaster,
	sendTimeout time.Duration,
	limit float64,
	sub state_stream.Streamable,
) *Streamer

func (*Streamer) Stream

func (s *Streamer) Stream(ctx context.Context)

Stream is a blocking method that streams data to the subscription until either the context is cancelled or it encounters an error.

type SubscriptionImpl

type SubscriptionImpl struct {
	// contains filtered or unexported fields
}

func NewFailedSubscription

func NewFailedSubscription(err error, msg string) *SubscriptionImpl

NewFailedSubscription returns a new subscription that has already failed with the given error and message. This is useful to return an error that occurred during subscription setup.

func NewSubscription

func NewSubscription(bufferSize int) *SubscriptionImpl

func (*SubscriptionImpl) Channel

func (sub *SubscriptionImpl) Channel() <-chan interface{}

Channel returns the channel from which subscription data can be read

func (*SubscriptionImpl) Close

func (sub *SubscriptionImpl) Close()

Close is called when a subscription ends gracefully, and closes the subscription channel

func (*SubscriptionImpl) Err

func (sub *SubscriptionImpl) Err() error

Err returns the error that caused the subscription to fail

func (*SubscriptionImpl) Fail

func (sub *SubscriptionImpl) Fail(err error)

Fail registers an error and closes the subscription channel

func (*SubscriptionImpl) ID

func (sub *SubscriptionImpl) ID() string

ID returns the subscription ID Note: this is not a cryptographic hash

func (*SubscriptionImpl) Send

func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error

Send sends a value to the subscription channel or returns an error Expected errors: - context.DeadlineExceeded if send timed out - context.Canceled if the client disconnected

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL