Documentation ¶
Index ¶
- func NewBloomAllocator[I core.BlockId](config *Config) func() filter.Filter[I]
- type BatchBlockReceiver
- type BatchBlockSender
- type BatchSinkOrchestrator
- func (bro *BatchSinkOrchestrator) IsClosed() bool
- func (bro *BatchSinkOrchestrator) IsRequester() bool
- func (bro *BatchSinkOrchestrator) IsSafeStateToClose() bool
- func (bro *BatchSinkOrchestrator) Notify(event core.SessionEvent) error
- func (bro *BatchSinkOrchestrator) ShouldFlush() bool
- func (bro *BatchSinkOrchestrator) State() BatchState
- type BatchSourceOrchestrator
- func (bso *BatchSourceOrchestrator) IsClosed() bool
- func (bso *BatchSourceOrchestrator) IsRequester() bool
- func (bso *BatchSourceOrchestrator) IsSafeStateToClose() bool
- func (bso *BatchSourceOrchestrator) Notify(event core.SessionEvent) error
- func (bso *BatchSourceOrchestrator) ShouldFlush() bool
- func (bso *BatchSourceOrchestrator) State() BatchState
- type BatchState
- type Config
- type GenericBatchSinkConnection
- func (conn *GenericBatchSinkConnection[I, R]) DeferredBatchSender() core.StatusSender[I]
- func (conn *GenericBatchSinkConnection[I, R]) DeferredSender() core.StatusSender[I]
- func (conn *GenericBatchSinkConnection[I, R]) PendingResponse() *messages.StatusMessage[I, R]
- func (conn *GenericBatchSinkConnection[I, R]) Receiver(session *core.SinkSession[I, BatchState]) *SimpleBatchBlockReceiver[I]
- func (conn *GenericBatchSinkConnection[I, R]) Sender(statusSender core.StatusSender[I]) core.StatusSender[I]
- func (conn *GenericBatchSinkConnection[I, R]) Session(store core.BlockStore[I], accumulator core.StatusAccumulator[I], ...) *core.SinkSession[I, BatchState]
- type GenericBatchSourceConnection
- func (conn *GenericBatchSourceConnection[I, R]) DeferredBatchSender() BatchBlockSender[I]
- func (conn *GenericBatchSourceConnection[I, R]) DeferredSender(maxBlocksPerRound uint32) core.BlockSender[I]
- func (conn *GenericBatchSourceConnection[I, R]) IsRequester() bool
- func (conn *GenericBatchSourceConnection[I, R]) PendingResponse() *messages.BlocksMessage[I, R]
- func (conn *GenericBatchSourceConnection[I, R]) Receiver(session *core.SourceSession[I, BatchState]) *SimpleBatchStatusReceiver[I]
- func (conn *GenericBatchSourceConnection[I, R]) Sender(batchSender BatchBlockSender[I], maxBlocksPerRound uint32) core.BlockSender[I]
- func (conn *GenericBatchSourceConnection[I, R]) Session(store core.BlockStore[I], filter filter.Filter[I], requester bool) *core.SourceSession[I, BatchState]
- type ResponseBatchBlockSender
- type ResponseStatusSender
- type SessionId
- type SimpleBatchBlockReceiver
- type SimpleBatchBlockSender
- func (sbbs *SimpleBatchBlockSender[I]) Close() error
- func (sbbs *SimpleBatchBlockSender[I]) Flush() error
- func (sbbs *SimpleBatchBlockSender[I]) Len() int
- func (sbbs *SimpleBatchBlockSender[I]) Orchestrator() core.Orchestrator[BatchState]
- func (sbbs *SimpleBatchBlockSender[I]) SendBlock(block core.RawBlock[I]) error
- type SimpleBatchStatusReceiver
- type SinkResponder
- func (sr *SinkResponder[I, R]) AllDone()
- func (sr *SinkResponder[I, R]) Receiver(sessionId SessionId) *SimpleBatchBlockReceiver[I]
- func (sr *SinkResponder[I, R]) SetStatusSender(statusSender core.StatusSender[I])
- func (sr *SinkResponder[I, R]) SinkConnection(sessionId SessionId) *GenericBatchSinkConnection[I, R]
- func (sr *SinkResponder[I, R]) SinkSession(sessionId SessionId) *core.SinkSession[I, BatchState]
- func (sr *SinkResponder[I, R]) SinkSessionData(sessionId SessionId) *SinkSessionData[I, R]
- func (sr *SinkResponder[I, R]) SinkSessionIds() []SessionId
- type SinkSessionData
- type SourceResponder
- func (sr *SourceResponder[I, R]) AllDone()
- func (sr *SourceResponder[I, R]) Receiver(sessionId SessionId) *SimpleBatchStatusReceiver[I]
- func (sr *SourceResponder[I, R]) SetBatchBlockSender(batchBlockSender BatchBlockSender[I])
- func (sr *SourceResponder[I, R]) SourceConnection(sessionId SessionId) *GenericBatchSourceConnection[I, R]
- func (sr *SourceResponder[I, R]) SourceSession(sessionId SessionId) *core.SourceSession[I, BatchState]
- func (sr *SourceResponder[I, R]) SourceSessionData(sessionId SessionId) *SourceSessionData[I, R]
- func (sr *SourceResponder[I, R]) SourceSessionIds() []SessionId
- type SourceSessionData
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BatchBlockReceiver ¶
type BatchBlockReceiver[I core.BlockId] interface { // HandleList handles a list of blocks. HandleList([]core.RawBlock[I]) error }
BatchBlockReceiver is an interface for receiving batches of blocks.
type BatchBlockSender ¶
type BatchBlockSender[I core.BlockId] interface { // SendList sends a list of blocks. SendList([]core.RawBlock[I]) error // Close closes the sender. Close() error }
BatchBlockSender is an interface for sending batches of blocks.
type BatchSinkOrchestrator ¶
type BatchSinkOrchestrator struct {
// contains filtered or unexported fields
}
BatchSinkOrchestrator is an orchestrator for receiving batches of blocks.
func NewBatchSinkOrchestrator ¶
func NewBatchSinkOrchestrator(requester bool) *BatchSinkOrchestrator
NewBatchSinkOrchestrator creates a new BatchSinkOrchestrator.
func (*BatchSinkOrchestrator) IsClosed ¶
func (bro *BatchSinkOrchestrator) IsClosed() bool
IsClosed returns true if the receiver is closed.
func (*BatchSinkOrchestrator) IsRequester ¶
func (bro *BatchSinkOrchestrator) IsRequester() bool
IsRequester returns true if this is a requester.
func (*BatchSinkOrchestrator) IsSafeStateToClose ¶
func (bro *BatchSinkOrchestrator) IsSafeStateToClose() bool
func (*BatchSinkOrchestrator) Notify ¶
func (bro *BatchSinkOrchestrator) Notify(event core.SessionEvent) error
Notify notifies the orchestrator of a session event, updating the state as appropriate.
func (*BatchSinkOrchestrator) ShouldFlush ¶
func (bro *BatchSinkOrchestrator) ShouldFlush() bool
ShouldFlush returns true if either:
- We are requester and we haven't already flushed and are therefore waiting for a response, OR
- We are responder and have received a request and are handling it, meaning we haven't responded yet.
func (*BatchSinkOrchestrator) State ¶
func (bro *BatchSinkOrchestrator) State() BatchState
State returns the current state of the orchestrator.
type BatchSourceOrchestrator ¶
type BatchSourceOrchestrator struct {
// contains filtered or unexported fields
}
BatchSourceOrchestrator is an orchestrator for sending batches of blocks.
func NewBatchSourceOrchestrator ¶
func NewBatchSourceOrchestrator(requester bool) *BatchSourceOrchestrator
NewBatchSourceOrchestrator creates a new BatchSourceOrchestrator.
func (*BatchSourceOrchestrator) IsClosed ¶
func (bso *BatchSourceOrchestrator) IsClosed() bool
IsClosed returns true if the sender is closed.
func (*BatchSourceOrchestrator) IsRequester ¶
func (bso *BatchSourceOrchestrator) IsRequester() bool
IsRequester returns true if this is a requester.
func (*BatchSourceOrchestrator) IsSafeStateToClose ¶
func (bso *BatchSourceOrchestrator) IsSafeStateToClose() bool
func (*BatchSourceOrchestrator) Notify ¶
func (bso *BatchSourceOrchestrator) Notify(event core.SessionEvent) error
Notify notifies the orchestrator of a session event, updating the state as appropriate.
func (*BatchSourceOrchestrator) ShouldFlush ¶
func (bso *BatchSourceOrchestrator) ShouldFlush() bool
ShouldFlush returns true if either:
- We are requester and we haven't already flushed and are therefore waiting for a response, OR
- We are responder and have received a request and are handling it, meaning we haven't responded yet.
func (*BatchSourceOrchestrator) State ¶
func (bso *BatchSourceOrchestrator) State() BatchState
State returns the current state of the orchestrator.
type BatchState ¶
type BatchState uint32
BatchState is a bitfield that describes the state of a batch session.
const ( SINK_CLOSING BatchState = 1 << iota SINK_CLOSED SINK_PROCESSING // Sink is processing a batch of blocks SINK_FLUSHING // Sink is in the process of sending a status message SINK_WAITING // Sink is waiting for a batch of blocks SINK_ENQUEUEING // Sink is processing a block id that has been explicitly requested SINK_SENDING // Sink may have status pending flush SINK_RECEIVING // Sink is receiving blocks SINK_HANDLING_BATCH SOURCE_PROCESSING // Source is processing a batch of blocks. Technically it's also being used to signal that we're *ready* to process, such that BEGIN_PROCESSING will let processing happen. SOURCE_FLUSHING // Source is flushing blocks SOURCE_WAITING // Source is waiting for a status message SOURCE_ENQUEUEING // Source is processing a block id that has been explicitly requested SOURCE_SENDING // Source has unflushed blocks SOURCE_RECEIVING // Source is receiving blocks SOURCE_HANDLING_BATCH SOURCE_CLOSING SOURCE_CLOSED CANCELLED SINK = SINK_CLOSING | CANCELLED | SINK_PROCESSING | SINK_FLUSHING | SINK_WAITING | SINK_CLOSED | SINK_SENDING | SINK_ENQUEUEING | SINK_RECEIVING | SINK_HANDLING_BATCH SOURCE = SOURCE_CLOSING | CANCELLED | SOURCE_PROCESSING | SOURCE_FLUSHING | SOURCE_WAITING | SOURCE_CLOSED | SOURCE_SENDING | SOURCE_ENQUEUEING | SOURCE_RECEIVING | SOURCE_HANDLING_BATCH )
Named constants for BatchState flags.
func (BatchState) String ¶
func (bs BatchState) String() string
String returns a string describing the given BatchState. Each state flag is separated by a pipe character.
func (BatchState) Strings ¶
func (bs BatchState) Strings() []string
Strings returns a slice of strings describing the given BatchState.
type Config ¶
type Config struct { MaxBlocksPerRound uint32 MaxBlocksPerColdCall uint32 BloomCapacity uint BloomFunction uint64 Instrument instrumented.InstrumentationOptions }
func DefaultConfig ¶
func DefaultConfig() Config
type GenericBatchSinkConnection ¶
type GenericBatchSinkConnection[I core.BlockId, R core.BlockIdRef[I]] struct { core.Orchestrator[BatchState] // contains filtered or unexported fields }
func NewGenericBatchSinkConnection ¶
func NewGenericBatchSinkConnection[I core.BlockId, R core.BlockIdRef[I]](stats stats.Stats, instrument instrumented.InstrumentationOptions, maxBlocksPerRound uint32, requester bool) *GenericBatchSinkConnection[I, R]
func (*GenericBatchSinkConnection[I, R]) DeferredBatchSender ¶
func (conn *GenericBatchSinkConnection[I, R]) DeferredBatchSender() core.StatusSender[I]
func (*GenericBatchSinkConnection[I, R]) DeferredSender ¶
func (conn *GenericBatchSinkConnection[I, R]) DeferredSender() core.StatusSender[I]
func (*GenericBatchSinkConnection[I, R]) PendingResponse ¶
func (conn *GenericBatchSinkConnection[I, R]) PendingResponse() *messages.StatusMessage[I, R]
func (*GenericBatchSinkConnection[I, R]) Receiver ¶
func (conn *GenericBatchSinkConnection[I, R]) Receiver(session *core.SinkSession[I, BatchState]) *SimpleBatchBlockReceiver[I]
func (*GenericBatchSinkConnection[I, R]) Sender ¶
func (conn *GenericBatchSinkConnection[I, R]) Sender(statusSender core.StatusSender[I]) core.StatusSender[I]
TODO: This really just instruments now, nothing else.
func (*GenericBatchSinkConnection[I, R]) Session ¶
func (conn *GenericBatchSinkConnection[I, R]) Session(store core.BlockStore[I], accumulator core.StatusAccumulator[I], requester bool) *core.SinkSession[I, BatchState]
type GenericBatchSourceConnection ¶
type GenericBatchSourceConnection[I core.BlockId, R core.BlockIdRef[I]] struct { core.Orchestrator[BatchState] // contains filtered or unexported fields }
func NewGenericBatchSourceConnection ¶
func NewGenericBatchSourceConnection[I core.BlockId, R core.BlockIdRef[I]](stats stats.Stats, instrument instrumented.InstrumentationOptions, maxBlocksPerRound uint32, maxBlocksPerColdCall uint32, requester bool) *GenericBatchSourceConnection[I, R]
func (*GenericBatchSourceConnection[I, R]) DeferredBatchSender ¶
func (conn *GenericBatchSourceConnection[I, R]) DeferredBatchSender() BatchBlockSender[I]
func (*GenericBatchSourceConnection[I, R]) DeferredSender ¶
func (conn *GenericBatchSourceConnection[I, R]) DeferredSender(maxBlocksPerRound uint32) core.BlockSender[I]
func (*GenericBatchSourceConnection[I, R]) IsRequester ¶
func (conn *GenericBatchSourceConnection[I, R]) IsRequester() bool
IsRequester returns true if the connection is a requester.
func (*GenericBatchSourceConnection[I, R]) PendingResponse ¶
func (conn *GenericBatchSourceConnection[I, R]) PendingResponse() *messages.BlocksMessage[I, R]
func (*GenericBatchSourceConnection[I, R]) Receiver ¶
func (conn *GenericBatchSourceConnection[I, R]) Receiver(session *core.SourceSession[I, BatchState]) *SimpleBatchStatusReceiver[I]
func (*GenericBatchSourceConnection[I, R]) Sender ¶
func (conn *GenericBatchSourceConnection[I, R]) Sender(batchSender BatchBlockSender[I], maxBlocksPerRound uint32) core.BlockSender[I]
func (*GenericBatchSourceConnection[I, R]) Session ¶
func (conn *GenericBatchSourceConnection[I, R]) Session(store core.BlockStore[I], filter filter.Filter[I], requester bool) *core.SourceSession[I, BatchState]
type ResponseBatchBlockSender ¶
type ResponseBatchBlockSender[I core.BlockId, R core.BlockIdRef[I]] struct { // contains filtered or unexported fields }
func (*ResponseBatchBlockSender[I, R]) Close ¶
func (bbs *ResponseBatchBlockSender[I, R]) Close() error
type ResponseStatusSender ¶
type ResponseStatusSender[I core.BlockId, R core.BlockIdRef[I]] struct { // contains filtered or unexported fields }
func (*ResponseStatusSender[I, R]) Close ¶
func (ss *ResponseStatusSender[I, R]) Close() error
func (*ResponseStatusSender[I, R]) SendStatus ¶
func (ss *ResponseStatusSender[I, R]) SendStatus(have filter.Filter[I], want []I) error
type SimpleBatchBlockReceiver ¶
SimpleBatchBlockReceiver is a simple implementation of BatchBlockReceiver.
func NewSimpleBatchBlockReceiver ¶
func NewSimpleBatchBlockReceiver[I core.BlockId](session core.BlockReceiver[I], orchestrator core.Orchestrator[BatchState]) *SimpleBatchBlockReceiver[I]
NewSimpleBatchBlockReceiver creates a new SimpleBatchBlockReceiver.
func (*SimpleBatchBlockReceiver[I]) HandleList ¶
func (sbbr *SimpleBatchBlockReceiver[I]) HandleList(list []core.RawBlock[I]) error
HandleList handles a list of raw blocks.
func (*SimpleBatchBlockReceiver[I]) Orchestrator ¶
func (sbbr *SimpleBatchBlockReceiver[I]) Orchestrator() core.Orchestrator[BatchState]
Orchestrator returns the orchestrator.
type SimpleBatchBlockSender ¶
SimpleBatchBlockSender is a simple implementation of BlockSender which wraps a BatchBlockSender
func NewSimpleBatchBlockSender ¶
func NewSimpleBatchBlockSender[I core.BlockId](batchBlockSender BatchBlockSender[I], orchestrator core.Orchestrator[BatchState], maxBlocksPerRound uint32) *SimpleBatchBlockSender[I]
NewSimpleBatchBlockSender creates a new SimpleBatchBlockSender.
func (*SimpleBatchBlockSender[I]) Close ¶
func (sbbs *SimpleBatchBlockSender[I]) Close() error
Close closes the sender.
func (*SimpleBatchBlockSender[I]) Flush ¶
func (sbbs *SimpleBatchBlockSender[I]) Flush() error
Flush sends the current batch of blocks.
func (*SimpleBatchBlockSender[I]) Len ¶
func (sbbs *SimpleBatchBlockSender[I]) Len() int
Len returns the number of blocks waiting to be sent.
func (*SimpleBatchBlockSender[I]) Orchestrator ¶
func (sbbs *SimpleBatchBlockSender[I]) Orchestrator() core.Orchestrator[BatchState]
Orchestrator returns the orchestrator.
type SimpleBatchStatusReceiver ¶
TODO: Naming is confusing. This has BatchStatusReceiver in its name, but it doesn't implement HandleList and therefore isn't a BatchStatusReceiver. It is batch only in that it uses the orchestrator with BatchState to notify.
func NewSimpleBatchStatusReceiver ¶
func NewSimpleBatchStatusReceiver[I core.BlockId](statusReceiver core.StatusReceiver[I], orchestrator core.Orchestrator[BatchState]) *SimpleBatchStatusReceiver[I]
NewSimpleBatchStatusReceiver creates a new SimpleBatchStatusReceiver.
func (*SimpleBatchStatusReceiver[I]) HandleStatus ¶
func (sbsr *SimpleBatchStatusReceiver[I]) HandleStatus(have filter.Filter[I], want []I) error
HandleList handles a list of raw blocks.
func (*SimpleBatchStatusReceiver[I]) Orchestrator ¶
func (sbsr *SimpleBatchStatusReceiver[I]) Orchestrator() core.Orchestrator[BatchState]
Orchestrator returns the orchestrator.
type SinkResponder ¶
type SinkResponder[I core.BlockId, R core.BlockIdRef[I]] struct { // contains filtered or unexported fields }
func NewSinkResponder ¶
func NewSinkResponder[I core.BlockId, R core.BlockIdRef[I]](store core.BlockStore[I], config Config, statusSender core.StatusSender[I]) *SinkResponder[I, R]
func (*SinkResponder[I, R]) AllDone ¶
func (sr *SinkResponder[I, R]) AllDone()
func (*SinkResponder[I, R]) Receiver ¶
func (sr *SinkResponder[I, R]) Receiver(sessionId SessionId) *SimpleBatchBlockReceiver[I]
func (*SinkResponder[I, R]) SetStatusSender ¶
func (sr *SinkResponder[I, R]) SetStatusSender(statusSender core.StatusSender[I])
func (*SinkResponder[I, R]) SinkConnection ¶
func (sr *SinkResponder[I, R]) SinkConnection(sessionId SessionId) *GenericBatchSinkConnection[I, R]
func (*SinkResponder[I, R]) SinkSession ¶
func (sr *SinkResponder[I, R]) SinkSession(sessionId SessionId) *core.SinkSession[I, BatchState]
func (*SinkResponder[I, R]) SinkSessionData ¶
func (sr *SinkResponder[I, R]) SinkSessionData(sessionId SessionId) *SinkSessionData[I, R]
func (*SinkResponder[I, R]) SinkSessionIds ¶
func (sr *SinkResponder[I, R]) SinkSessionIds() []SessionId
type SinkSessionData ¶
type SinkSessionData[I core.BlockId, R core.BlockIdRef[I]] struct { Session *core.SinkSession[I, BatchState] // contains filtered or unexported fields }
type SourceResponder ¶
type SourceResponder[I core.BlockId, R core.BlockIdRef[I]] struct { // contains filtered or unexported fields }
func NewSourceResponder ¶
func NewSourceResponder[I core.BlockId, R core.BlockIdRef[I]](store core.BlockStore[I], config Config, batchBlockSender BatchBlockSender[I]) *SourceResponder[I, R]
func (*SourceResponder[I, R]) AllDone ¶
func (sr *SourceResponder[I, R]) AllDone()
func (*SourceResponder[I, R]) Receiver ¶
func (sr *SourceResponder[I, R]) Receiver(sessionId SessionId) *SimpleBatchStatusReceiver[I]
func (*SourceResponder[I, R]) SetBatchBlockSender ¶
func (sr *SourceResponder[I, R]) SetBatchBlockSender(batchBlockSender BatchBlockSender[I])
func (*SourceResponder[I, R]) SourceConnection ¶
func (sr *SourceResponder[I, R]) SourceConnection(sessionId SessionId) *GenericBatchSourceConnection[I, R]
func (*SourceResponder[I, R]) SourceSession ¶
func (sr *SourceResponder[I, R]) SourceSession(sessionId SessionId) *core.SourceSession[I, BatchState]
func (*SourceResponder[I, R]) SourceSessionData ¶
func (sr *SourceResponder[I, R]) SourceSessionData(sessionId SessionId) *SourceSessionData[I, R]
func (*SourceResponder[I, R]) SourceSessionIds ¶
func (sr *SourceResponder[I, R]) SourceSessionIds() []SessionId
type SourceSessionData ¶
type SourceSessionData[I core.BlockId, R core.BlockIdRef[I]] struct { Session *core.SourceSession[I, BatchState] // contains filtered or unexported fields }