engine

package
v0.22.9-patch-1-epoch-... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2021 License: AGPL-3.0 Imports: 11 Imported by: 42

README

Notifier

The Notifier implements the following state machine Notifier State Machine

The intended usage pattern is:

  • there are goroutines, aka Producers, that append work to a queue pendingWorkQueue
  • there is a number of goroutines, aka Consumers, that pull work from the pendingWorkQueue
    • they consume work until they have drained the pendingWorkQueue
    • when they find that the pendingWorkQueue contains no more work, they go back to the notifier and await notification

Notifier Usage Pattern

Note that the consumer / producer interact in a different order with the pendingWorkQueue vs the notifier:

  • the producer first drops its work into the queue and subsequently sends the notification
  • the consumer first processes elements from the queue and subsequently checks for a notification Thereby, it is guaranteed that at least one consumer routine will be notified when work is added

Documentation

Index

Constants

View Source
const (

	// Channels used for testing
	TestNetwork = network.Channel("test-network")
	TestMetrics = network.Channel("test-metrics")

	// Channels for consensus protocols
	ConsensusCommittee = network.Channel("consensus-committee")

	// Channels for protocols actively synchronizing state across nodes
	SyncCommittee = network.Channel("sync-committee")

	SyncExecution = network.Channel("sync-execution")

	// Channels for dkg communication
	DKGCommittee = "dkg-committee"

	// Channels for actively pushing entities to subscribers
	PushTransactions = network.Channel("push-transactions")
	PushGuarantees   = network.Channel("push-guarantees")
	PushBlocks       = network.Channel("push-blocks")
	PushReceipts     = network.Channel("push-receipts")
	PushApprovals    = network.Channel("push-approvals")

	// Channels for actively requesting missing entities
	RequestCollections       = network.Channel("request-collections")
	RequestChunks            = network.Channel("request-chunks")
	RequestReceiptsByBlockID = network.Channel("request-receipts-by-block-id")
	RequestApprovalsByChunk  = network.Channel("request-approvals-by-chunk")

	// Channel aliases to make the code more readable / more robust to errors
	ReceiveTransactions = PushTransactions
	ReceiveGuarantees   = PushGuarantees
	ReceiveBlocks       = PushBlocks
	ReceiveReceipts     = PushReceipts
	ReceiveApprovals    = PushApprovals

	ProvideCollections       = RequestCollections
	ProvideChunks            = RequestChunks
	ProvideReceiptsByBlockID = RequestReceiptsByBlockID
	ProvideApprovalsByChunk  = RequestApprovalsByChunk

	// Public network channels
	PublicSyncCommittee = network.Channel("public-sync-committee")
)

channels

Variables

View Source
var (
	// IncompatibleInputTypeError indicates that the input has an incompatible type
	IncompatibleInputTypeError = errors.New("incompatible input type")
)

Functions

func ChannelConsensusCluster added in v0.9.6

func ChannelConsensusCluster(clusterID flow.ChainID) network.Channel

ChannelConsensusCluster returns a dynamic cluster consensus channel based on the chain ID of the cluster in question.

func ChannelFromTopic added in v0.22.4

func ChannelFromTopic(topic network.Topic) (network.Channel, bool)

func ChannelSyncCluster added in v0.9.6

func ChannelSyncCluster(clusterID flow.ChainID) network.Channel

ChannelSyncCluster returns a dynamic cluster sync channel based on the chain ID of the cluster in question.

func Channels added in v0.14.0

func Channels() network.ChannelList

Channels returns all channels that nodes of any role have subscribed to (except cluster-based channels).

func ChannelsByRole added in v0.14.0

func ChannelsByRole(role flow.Role) network.ChannelList

ChannelsByRole returns a list of all channels the role subscribes to (except cluster-based channels and public channels).

func ClusterChannelRoles added in v0.22.4

func ClusterChannelRoles(clusterChannel network.Channel) flow.RoleList

ClusterChannelRoles returns the list of roles that are involved in the given cluster-based channel.

func Exists added in v0.14.0

func Exists(channel network.Channel) bool

Exists returns true if the channel exists.

func IsClusterChannel added in v0.22.4

func IsClusterChannel(channel network.Channel) bool

IsClusterChannel returns true if channel is cluster-based. Currently, only collection nodes are involved in a cluster-based channels.

func IsDuplicatedEntryError added in v0.14.0

func IsDuplicatedEntryError(err error) bool

func IsInvalidInputError

func IsInvalidInputError(err error) bool

IsInvalidInputError returns whether the given error is an InvalidInputError error

func IsOutdatedInputError

func IsOutdatedInputError(err error) bool

func IsUnverifiableInputError added in v0.15.0

func IsUnverifiableInputError(err error) bool

func LogError

func LogError(log zerolog.Logger, err error)

LogError logs the engine processing error

func LogErrorWithMsg added in v0.15.0

func LogErrorWithMsg(log zerolog.Logger, msg string, err error)

func NewDuplicatedEntryErrorf added in v0.14.0

func NewDuplicatedEntryErrorf(msg string, args ...interface{}) error

func NewInvalidInputError

func NewInvalidInputError(msg string) error

func NewInvalidInputErrorf

func NewInvalidInputErrorf(msg string, args ...interface{}) error

func NewOutdatedInputErrorf

func NewOutdatedInputErrorf(msg string, args ...interface{}) error

func NewUnverifiableInputError added in v0.15.0

func NewUnverifiableInputError(msg string, args ...interface{}) error

func PublicChannels added in v0.22.4

func PublicChannels() network.ChannelList

PublicChannels returns all channels that are used on the public network.

func RolesByChannel added in v0.14.0

func RolesByChannel(channel network.Channel) (flow.RoleList, bool)

RolesByChannel returns list of flow roles involved in the channel. If the given channel is a public channel, the returned list will contain all roles.

func TopicFromChannel added in v0.14.0

func TopicFromChannel(channel network.Channel, rootBlockID flow.Identifier) network.Topic

TopicFromChannel returns the unique LibP2P topic form the channel. The channel is made up of name string suffixed with root block id. The root block id is used to prevent cross talks between nodes on different sporks.

func UniqueChannels added in v0.14.0

func UniqueChannels(channels network.ChannelList) network.ChannelList

UniqueChannels returns list of non-cluster channels with a unique RoleList accompanied with the list of all cluster channels. e.g. if channel X and Y both are non-cluster channels and have role IDs [A,B,C] then only one of them will be in the returned list.

Types

type DuplicatedEntryError added in v0.14.0

type DuplicatedEntryError struct {
	// contains filtered or unexported fields
}

func (DuplicatedEntryError) Error added in v0.14.0

func (e DuplicatedEntryError) Error() string

func (DuplicatedEntryError) Unwrap added in v0.14.0

func (e DuplicatedEntryError) Unwrap() error

type FifoMessageStore added in v0.17.2

type FifoMessageStore struct {
	*fifoqueue.FifoQueue
}

FifoMessageStore wraps a FiFo Queue to implement the MessageStore interface.

func (*FifoMessageStore) Get added in v0.17.2

func (s *FifoMessageStore) Get() (*Message, bool)

func (*FifoMessageStore) Put added in v0.17.2

func (s *FifoMessageStore) Put(msg *Message) bool

type FilterFunc added in v0.17.2

type FilterFunc func(*Message) bool

type InvalidInputError

type InvalidInputError struct {
	// contains filtered or unexported fields
}

InvalidInputError are errors for caused by invalid inputs. It's useful to distinguish these known errors from exceptions. By distinguishing errors from exceptions, we can log them differently. For instance, log InvalidInputError error as a warn log, and log other error as an error log.

func (InvalidInputError) Error

func (e InvalidInputError) Error() string

func (InvalidInputError) Unwrap

func (e InvalidInputError) Unwrap() error

type MapFunc added in v0.17.2

type MapFunc func(*Message) (*Message, bool)

type MatchFunc added in v0.17.2

type MatchFunc func(*Message) bool

type Message added in v0.17.2

type Message struct {
	OriginID flow.Identifier
	Payload  interface{}
}

type MessageHandler added in v0.17.2

type MessageHandler struct {
	// contains filtered or unexported fields
}

func NewMessageHandler added in v0.17.2

func NewMessageHandler(log zerolog.Logger, notifier Notifier, patterns ...Pattern) *MessageHandler

func (*MessageHandler) GetNotifier added in v0.17.2

func (e *MessageHandler) GetNotifier() <-chan struct{}

func (*MessageHandler) Process added in v0.17.2

func (e *MessageHandler) Process(originID flow.Identifier, payload interface{}) error

Process iterates over the internal processing patterns and determines if the payload matches. The _first_ matching pattern processes the payload. Returns

  • IncompatibleInputTypeError if no matching processor was found
  • All other errors are potential symptoms of internal state corruption or bugs (fatal).

type MessageStore added in v0.17.2

type MessageStore interface {
	Put(*Message) bool
	Get() (*Message, bool)
}

MessageStore is the interface to abstract how messages are buffered in memory before being handled by the engine

type Notifier added in v0.17.6

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier is a concurrency primitive for informing worker routines about the arrival of new work unit(s). Notifiers essentially behave like channels in that they can be passed by value and still allow concurrent updates of the same internal state.

func NewNotifier added in v0.17.6

func NewNotifier() Notifier

NewNotifier instantiates a Notifier. Notifiers essentially behave like channels in that they can be passed by value and still allow concurrent updates of the same internal state.

func (Notifier) Channel added in v0.17.6

func (n Notifier) Channel() <-chan struct{}

Channel returns a channel for receiving notifications

func (Notifier) Notify added in v0.17.6

func (n Notifier) Notify()

Notify sends a notification

type OutdatedInputError

type OutdatedInputError struct {
	// contains filtered or unexported fields
}

OutdatedInputError are for inputs that are outdated. An outdated input doesn't mean whether the input was invalid or not, knowing that would take more computation that isn't necessary. An outdated input could also for a duplicated input: the duplication is outdated.

func (OutdatedInputError) Error

func (e OutdatedInputError) Error() string

func (OutdatedInputError) Unwrap

func (e OutdatedInputError) Unwrap() error

type Pattern added in v0.17.2

type Pattern struct {
	// Match is a function to match a message to this pattern, typically by payload type.
	Match MatchFunc
	// Map is a function to apply to messages before storing them. If not provided, then the message is stored in its original form.
	Map MapFunc
	// Store is an abstract message store where we will store the message upon receipt.
	Store MessageStore
}

type Unit

type Unit struct {
	sync.Mutex // can be used to synchronize the engine
	// contains filtered or unexported fields
}

Unit handles synchronization management, startup, and shutdown for engines.

func NewUnit

func NewUnit() *Unit

NewUnit returns a new unit.

func (*Unit) Ctx

func (u *Unit) Ctx() context.Context

Ctx returns a context with the same lifecycle scope as the unit. In particular, it is cancelled when Done is called, so it can be used as the parent context for processes spawned by any engine whose lifecycle is managed by a unit.

func (*Unit) Do

func (u *Unit) Do(f func() error) error

Do synchronously executes the input function f unless the unit has shut down. It returns the result of f. If f is executed, the unit will not shut down until after f returns.

func (*Unit) Done

func (u *Unit) Done(actions ...func()) <-chan struct{}

Done returns a channel that is closed when the unit is done. A unit is done when (i) the series of "action" functions are executed and (ii) all pending functions invoked with `Do` or `Launch` have completed.

The engine using the unit is responsible for defining these action functions as required.

func (*Unit) Launch

func (u *Unit) Launch(f func())

Launch asynchronously executes the input function unless the unit has shut down. If f is executed, the unit will not shut down until after f returns.

func (*Unit) LaunchAfter

func (u *Unit) LaunchAfter(delay time.Duration, f func())

LaunchAfter asynchronously executes the input function after a certain delay unless the unit has shut down.

func (*Unit) LaunchPeriodically

func (u *Unit) LaunchPeriodically(f func(), interval time.Duration, delay time.Duration)

LaunchPeriodically asynchronously executes the input function on `interval` periods unless the unit has shut down. If f is executed, the unit will not shut down until after f returns.

func (*Unit) Quit

func (u *Unit) Quit() <-chan struct{}

Quit returns a channel that is closed when the unit begins to shut down.

func (*Unit) Ready

func (u *Unit) Ready(checks ...func()) <-chan struct{}

Ready returns a channel that is closed when the unit is ready. A unit is ready when the series of "check" functions are executed.

The engine using the unit is responsible for defining these check functions as required.

type UnverifiableInputError added in v0.15.0

type UnverifiableInputError struct {
	// contains filtered or unexported fields
}

UnverifiableInputError are for inputs that cannot be verified at this moment. Usually it means that we don't have enough data to verify it. A good example is missing data in DB to process input.

func (UnverifiableInputError) Error added in v0.15.0

func (e UnverifiableInputError) Error() string

func (UnverifiableInputError) Unwrap added in v0.15.0

func (e UnverifiableInputError) Unwrap() error

Directories

Path Synopsis
access
rpc
collection
ingest
Package ingest implements an engine for receiving transactions that need to be packaged into a collection.
Package ingest implements an engine for receiving transactions that need to be packaged into a collection.
pusher
Package pusher implements an engine for providing access to resources held by the collection node, including collections, collection guarantees, and transactions.
Package pusher implements an engine for providing access to resources held by the collection node, including collections, collection guarantees, and transactions.
common
dkg
rpc
ghost
verification

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL