batch

package
v0.0.0-...-ee0e00b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2023 License: Apache-2.0 Imports: 12 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBloomAllocator

func NewBloomAllocator[I core.BlockId](config *Config) func() filter.Filter[I]

Types

type BatchBlockReceiver

type BatchBlockReceiver[I core.BlockId] interface {
	// HandleList handles a list of blocks.
	HandleList([]core.RawBlock[I]) error
}

BatchBlockReceiver is an interface for receiving batches of blocks.

type BatchBlockSender

type BatchBlockSender[I core.BlockId] interface {
	// SendList sends a list of blocks.
	SendList([]core.RawBlock[I]) error
	// Close closes the sender.
	Close() error
}

BatchBlockSender is an interface for sending batches of blocks.

type BatchSinkOrchestrator

type BatchSinkOrchestrator struct {
	// contains filtered or unexported fields
}

BatchSinkOrchestrator is an orchestrator for receiving batches of blocks.

func NewBatchSinkOrchestrator

func NewBatchSinkOrchestrator(requester bool) *BatchSinkOrchestrator

NewBatchSinkOrchestrator creates a new BatchSinkOrchestrator.

func (*BatchSinkOrchestrator) IsClosed

func (bro *BatchSinkOrchestrator) IsClosed() bool

IsClosed returns true if the receiver is closed.

func (*BatchSinkOrchestrator) IsRequester

func (bro *BatchSinkOrchestrator) IsRequester() bool

IsRequester returns true if this is a requester.

func (*BatchSinkOrchestrator) IsSafeStateToClose

func (bro *BatchSinkOrchestrator) IsSafeStateToClose() bool

func (*BatchSinkOrchestrator) Notify

func (bro *BatchSinkOrchestrator) Notify(event core.SessionEvent) error

Notify notifies the orchestrator of a session event, updating the state as appropriate.

func (*BatchSinkOrchestrator) ShouldFlush

func (bro *BatchSinkOrchestrator) ShouldFlush() bool

ShouldFlush returns true if either:

  • We are requester and we haven't already flushed and are therefore waiting for a response, OR
  • We are responder and have received a request and are handling it, meaning we haven't responded yet.

func (*BatchSinkOrchestrator) State

func (bro *BatchSinkOrchestrator) State() BatchState

State returns the current state of the orchestrator.

type BatchSourceOrchestrator

type BatchSourceOrchestrator struct {
	// contains filtered or unexported fields
}

BatchSourceOrchestrator is an orchestrator for sending batches of blocks.

func NewBatchSourceOrchestrator

func NewBatchSourceOrchestrator(requester bool) *BatchSourceOrchestrator

NewBatchSourceOrchestrator creates a new BatchSourceOrchestrator.

func (*BatchSourceOrchestrator) IsClosed

func (bso *BatchSourceOrchestrator) IsClosed() bool

IsClosed returns true if the sender is closed.

func (*BatchSourceOrchestrator) IsRequester

func (bso *BatchSourceOrchestrator) IsRequester() bool

IsRequester returns true if this is a requester.

func (*BatchSourceOrchestrator) IsSafeStateToClose

func (bso *BatchSourceOrchestrator) IsSafeStateToClose() bool

func (*BatchSourceOrchestrator) Notify

func (bso *BatchSourceOrchestrator) Notify(event core.SessionEvent) error

Notify notifies the orchestrator of a session event, updating the state as appropriate.

func (*BatchSourceOrchestrator) ShouldFlush

func (bso *BatchSourceOrchestrator) ShouldFlush() bool

ShouldFlush returns true if either:

  • We are requester and we haven't already flushed and are therefore waiting for a response, OR
  • We are responder and have received a request and are handling it, meaning we haven't responded yet.

func (*BatchSourceOrchestrator) State

func (bso *BatchSourceOrchestrator) State() BatchState

State returns the current state of the orchestrator.

type BatchState

type BatchState uint32

BatchState is a bitfield that describes the state of a batch session.

const (
	SINK_CLOSING BatchState = 1 << iota
	SINK_CLOSED
	SINK_PROCESSING // Sink is processing a batch of blocks
	SINK_FLUSHING   // Sink is in the process of sending a status message
	SINK_WAITING    // Sink is waiting for a batch of blocks
	SINK_ENQUEUEING // Sink is processing a block id that has been explicitly requested
	SINK_SENDING    // Sink may have status pending flush
	SINK_RECEIVING  // Sink is receiving blocks
	SINK_HANDLING_BATCH
	SOURCE_PROCESSING // Source is processing a batch of blocks.  Technically it's also being used to signal that we're *ready* to process, such that BEGIN_PROCESSING will let processing happen.
	SOURCE_FLUSHING   // Source is flushing blocks
	SOURCE_WAITING    // Source is waiting for a status message
	SOURCE_ENQUEUEING // Source is processing a block id that has been explicitly requested
	SOURCE_SENDING    // Source has unflushed blocks
	SOURCE_RECEIVING  // Source is receiving blocks
	SOURCE_HANDLING_BATCH
	SOURCE_CLOSING
	SOURCE_CLOSED
	CANCELLED
	SINK   = SINK_CLOSING | CANCELLED | SINK_PROCESSING | SINK_FLUSHING | SINK_WAITING | SINK_CLOSED | SINK_SENDING | SINK_ENQUEUEING | SINK_RECEIVING | SINK_HANDLING_BATCH
	SOURCE = SOURCE_CLOSING | CANCELLED | SOURCE_PROCESSING | SOURCE_FLUSHING | SOURCE_WAITING | SOURCE_CLOSED | SOURCE_SENDING | SOURCE_ENQUEUEING | SOURCE_RECEIVING | SOURCE_HANDLING_BATCH
)

Named constants for BatchState flags.

func (BatchState) String

func (bs BatchState) String() string

String returns a string describing the given BatchState. Each state flag is separated by a pipe character.

func (BatchState) Strings

func (bs BatchState) Strings() []string

Strings returns a slice of strings describing the given BatchState.

type Config

type Config struct {
	MaxBlocksPerRound    uint32
	MaxBlocksPerColdCall uint32
	BloomCapacity        uint
	BloomFunction        uint64
	Instrument           instrumented.InstrumentationOptions
}

func DefaultConfig

func DefaultConfig() Config

type GenericBatchSinkConnection

type GenericBatchSinkConnection[I core.BlockId, R core.BlockIdRef[I]] struct {
	core.Orchestrator[BatchState]
	// contains filtered or unexported fields
}

func NewGenericBatchSinkConnection

func NewGenericBatchSinkConnection[I core.BlockId, R core.BlockIdRef[I]](stats stats.Stats, instrument instrumented.InstrumentationOptions, maxBlocksPerRound uint32, requester bool) *GenericBatchSinkConnection[I, R]

func (*GenericBatchSinkConnection[I, R]) DeferredBatchSender

func (conn *GenericBatchSinkConnection[I, R]) DeferredBatchSender() core.StatusSender[I]

func (*GenericBatchSinkConnection[I, R]) DeferredSender

func (conn *GenericBatchSinkConnection[I, R]) DeferredSender() core.StatusSender[I]

func (*GenericBatchSinkConnection[I, R]) PendingResponse

func (conn *GenericBatchSinkConnection[I, R]) PendingResponse() *messages.StatusMessage[I, R]

func (*GenericBatchSinkConnection[I, R]) Receiver

func (conn *GenericBatchSinkConnection[I, R]) Receiver(session *core.SinkSession[I, BatchState]) *SimpleBatchBlockReceiver[I]

func (*GenericBatchSinkConnection[I, R]) Sender

func (conn *GenericBatchSinkConnection[I, R]) Sender(statusSender core.StatusSender[I]) core.StatusSender[I]

TODO: This really just instruments now, nothing else.

func (*GenericBatchSinkConnection[I, R]) Session

func (conn *GenericBatchSinkConnection[I, R]) Session(store core.BlockStore[I], accumulator core.StatusAccumulator[I], requester bool) *core.SinkSession[I, BatchState]

type GenericBatchSourceConnection

type GenericBatchSourceConnection[I core.BlockId, R core.BlockIdRef[I]] struct {
	core.Orchestrator[BatchState]
	// contains filtered or unexported fields
}

func NewGenericBatchSourceConnection

func NewGenericBatchSourceConnection[I core.BlockId, R core.BlockIdRef[I]](stats stats.Stats, instrument instrumented.InstrumentationOptions, maxBlocksPerRound uint32, maxBlocksPerColdCall uint32, requester bool) *GenericBatchSourceConnection[I, R]

func (*GenericBatchSourceConnection[I, R]) DeferredBatchSender

func (conn *GenericBatchSourceConnection[I, R]) DeferredBatchSender() BatchBlockSender[I]

func (*GenericBatchSourceConnection[I, R]) DeferredSender

func (conn *GenericBatchSourceConnection[I, R]) DeferredSender(maxBlocksPerRound uint32) core.BlockSender[I]

func (*GenericBatchSourceConnection[I, R]) IsRequester

func (conn *GenericBatchSourceConnection[I, R]) IsRequester() bool

IsRequester returns true if the connection is a requester.

func (*GenericBatchSourceConnection[I, R]) PendingResponse

func (conn *GenericBatchSourceConnection[I, R]) PendingResponse() *messages.BlocksMessage[I, R]

func (*GenericBatchSourceConnection[I, R]) Receiver

func (*GenericBatchSourceConnection[I, R]) Sender

func (conn *GenericBatchSourceConnection[I, R]) Sender(batchSender BatchBlockSender[I], maxBlocksPerRound uint32) core.BlockSender[I]

func (*GenericBatchSourceConnection[I, R]) Session

func (conn *GenericBatchSourceConnection[I, R]) Session(store core.BlockStore[I], filter filter.Filter[I], requester bool) *core.SourceSession[I, BatchState]

type ResponseBatchBlockSender

type ResponseBatchBlockSender[I core.BlockId, R core.BlockIdRef[I]] struct {
	// contains filtered or unexported fields
}

func (*ResponseBatchBlockSender[I, R]) Close

func (bbs *ResponseBatchBlockSender[I, R]) Close() error

func (*ResponseBatchBlockSender[I, R]) SendList

func (bbs *ResponseBatchBlockSender[I, R]) SendList(blocks []core.RawBlock[I]) error

type ResponseStatusSender

type ResponseStatusSender[I core.BlockId, R core.BlockIdRef[I]] struct {
	// contains filtered or unexported fields
}

func (*ResponseStatusSender[I, R]) Close

func (ss *ResponseStatusSender[I, R]) Close() error

func (*ResponseStatusSender[I, R]) SendStatus

func (ss *ResponseStatusSender[I, R]) SendStatus(have filter.Filter[I], want []I) error

type SessionId

type SessionId string

type SimpleBatchBlockReceiver

type SimpleBatchBlockReceiver[I core.BlockId] struct {
	// contains filtered or unexported fields
}

SimpleBatchBlockReceiver is a simple implementation of BatchBlockReceiver.

func NewSimpleBatchBlockReceiver

func NewSimpleBatchBlockReceiver[I core.BlockId](session core.BlockReceiver[I], orchestrator core.Orchestrator[BatchState]) *SimpleBatchBlockReceiver[I]

NewSimpleBatchBlockReceiver creates a new SimpleBatchBlockReceiver.

func (*SimpleBatchBlockReceiver[I]) HandleList

func (sbbr *SimpleBatchBlockReceiver[I]) HandleList(list []core.RawBlock[I]) error

HandleList handles a list of raw blocks.

func (*SimpleBatchBlockReceiver[I]) Orchestrator

func (sbbr *SimpleBatchBlockReceiver[I]) Orchestrator() core.Orchestrator[BatchState]

Orchestrator returns the orchestrator.

type SimpleBatchBlockSender

type SimpleBatchBlockSender[I core.BlockId] struct {
	// contains filtered or unexported fields
}

SimpleBatchBlockSender is a simple implementation of BlockSender which wraps a BatchBlockSender

func NewSimpleBatchBlockSender

func NewSimpleBatchBlockSender[I core.BlockId](batchBlockSender BatchBlockSender[I], orchestrator core.Orchestrator[BatchState], maxBlocksPerRound uint32) *SimpleBatchBlockSender[I]

NewSimpleBatchBlockSender creates a new SimpleBatchBlockSender.

func (*SimpleBatchBlockSender[I]) Close

func (sbbs *SimpleBatchBlockSender[I]) Close() error

Close closes the sender.

func (*SimpleBatchBlockSender[I]) Flush

func (sbbs *SimpleBatchBlockSender[I]) Flush() error

Flush sends the current batch of blocks.

func (*SimpleBatchBlockSender[I]) Len

func (sbbs *SimpleBatchBlockSender[I]) Len() int

Len returns the number of blocks waiting to be sent.

func (*SimpleBatchBlockSender[I]) Orchestrator

func (sbbs *SimpleBatchBlockSender[I]) Orchestrator() core.Orchestrator[BatchState]

Orchestrator returns the orchestrator.

func (*SimpleBatchBlockSender[I]) SendBlock

func (sbbs *SimpleBatchBlockSender[I]) SendBlock(block core.RawBlock[I]) error

SendBlock adds a block to the batch and sends the batch if it is full.

type SimpleBatchStatusReceiver

type SimpleBatchStatusReceiver[I core.BlockId] struct {
	// contains filtered or unexported fields
}

TODO: Naming is confusing. This has BatchStatusReceiver in its name, but it doesn't implement HandleList and therefore isn't a BatchStatusReceiver. It is batch only in that it uses the orchestrator with BatchState to notify.

func NewSimpleBatchStatusReceiver

func NewSimpleBatchStatusReceiver[I core.BlockId](statusReceiver core.StatusReceiver[I], orchestrator core.Orchestrator[BatchState]) *SimpleBatchStatusReceiver[I]

NewSimpleBatchStatusReceiver creates a new SimpleBatchStatusReceiver.

func (*SimpleBatchStatusReceiver[I]) HandleStatus

func (sbsr *SimpleBatchStatusReceiver[I]) HandleStatus(have filter.Filter[I], want []I) error

HandleList handles a list of raw blocks.

func (*SimpleBatchStatusReceiver[I]) Orchestrator

func (sbsr *SimpleBatchStatusReceiver[I]) Orchestrator() core.Orchestrator[BatchState]

Orchestrator returns the orchestrator.

type SinkResponder

type SinkResponder[I core.BlockId, R core.BlockIdRef[I]] struct {
	// contains filtered or unexported fields
}

func NewSinkResponder

func NewSinkResponder[I core.BlockId, R core.BlockIdRef[I]](store core.BlockStore[I], config Config, statusSender core.StatusSender[I]) *SinkResponder[I, R]

func (*SinkResponder[I, R]) AllDone

func (sr *SinkResponder[I, R]) AllDone()

func (*SinkResponder[I, R]) Receiver

func (sr *SinkResponder[I, R]) Receiver(sessionId SessionId) *SimpleBatchBlockReceiver[I]

func (*SinkResponder[I, R]) SetStatusSender

func (sr *SinkResponder[I, R]) SetStatusSender(statusSender core.StatusSender[I])

func (*SinkResponder[I, R]) SinkConnection

func (sr *SinkResponder[I, R]) SinkConnection(sessionId SessionId) *GenericBatchSinkConnection[I, R]

func (*SinkResponder[I, R]) SinkSession

func (sr *SinkResponder[I, R]) SinkSession(sessionId SessionId) *core.SinkSession[I, BatchState]

func (*SinkResponder[I, R]) SinkSessionData

func (sr *SinkResponder[I, R]) SinkSessionData(sessionId SessionId) *SinkSessionData[I, R]

func (*SinkResponder[I, R]) SinkSessionIds

func (sr *SinkResponder[I, R]) SinkSessionIds() []SessionId

type SinkSessionData

type SinkSessionData[I core.BlockId, R core.BlockIdRef[I]] struct {
	Session *core.SinkSession[I, BatchState]
	// contains filtered or unexported fields
}

type SourceResponder

type SourceResponder[I core.BlockId, R core.BlockIdRef[I]] struct {
	// contains filtered or unexported fields
}

func NewSourceResponder

func NewSourceResponder[I core.BlockId, R core.BlockIdRef[I]](store core.BlockStore[I], config Config, batchBlockSender BatchBlockSender[I]) *SourceResponder[I, R]

func (*SourceResponder[I, R]) AllDone

func (sr *SourceResponder[I, R]) AllDone()

func (*SourceResponder[I, R]) Receiver

func (sr *SourceResponder[I, R]) Receiver(sessionId SessionId) *SimpleBatchStatusReceiver[I]

func (*SourceResponder[I, R]) SetBatchBlockSender

func (sr *SourceResponder[I, R]) SetBatchBlockSender(batchBlockSender BatchBlockSender[I])

func (*SourceResponder[I, R]) SourceConnection

func (sr *SourceResponder[I, R]) SourceConnection(sessionId SessionId) *GenericBatchSourceConnection[I, R]

func (*SourceResponder[I, R]) SourceSession

func (sr *SourceResponder[I, R]) SourceSession(sessionId SessionId) *core.SourceSession[I, BatchState]

func (*SourceResponder[I, R]) SourceSessionData

func (sr *SourceResponder[I, R]) SourceSessionData(sessionId SessionId) *SourceSessionData[I, R]

func (*SourceResponder[I, R]) SourceSessionIds

func (sr *SourceResponder[I, R]) SourceSessionIds() []SessionId

type SourceSessionData

type SourceSessionData[I core.BlockId, R core.BlockIdRef[I]] struct {
	Session *core.SourceSession[I, BatchState]
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL