stream

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2024 License: LGPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cond

type Cond struct {
	// contains filtered or unexported fields
}

Cond implements conditional variable with a channel

func NewCond

func NewCond() *Cond

func (*Cond) Broadcast

func (c *Cond) Broadcast()

func (*Cond) Wait

func (c *Cond) Wait(ctx context.Context) bool

Wait returns true if the condition is signaled, false if the context is canceled

type Queue

type Queue[V any] struct {
	// contains filtered or unexported fields
}

Queue represents a single instance of the queue data structure.

func New

func New[V any]() *Queue[V]

New constructs and returns a new Queue.

func (*Queue[V]) Add

func (q *Queue[V]) Add(elem V)

Add puts an element on the end of the queue.

func (*Queue[V]) Get

func (q *Queue[V]) Get(i int) V

Get returns the element at index i in the queue. If the index is invalid, the call will panic. This method accepts both positive and negative index values. Index 0 refers to the first element, and index -1 refers to the last.

func (*Queue[V]) GetP

func (q *Queue[V]) GetP(i int) *V

GetP returns the pointer to the element at index i in the queue. If the index is invalid, the call will panic. This method accepts both positive and negative index values. Index 0 refers to the first element, and index -1 refers to the last.

func (*Queue[V]) Length

func (q *Queue[V]) Length() int

Length returns the number of elements currently stored in the queue.

func (*Queue[V]) Peek

func (q *Queue[V]) Peek() V

Peek returns the element at the head of the queue. This call panics if the queue is empty.

func (*Queue[V]) PeekP

func (q *Queue[V]) PeekP() *V

PeekP returns the element at the head of the queue. This call panics if the queue is empty.

func (*Queue[V]) Remove

func (q *Queue[V]) Remove() V

Remove removes and returns the element from the front of the queue. If the queue is empty, the call will panic.

func (*Queue[V]) Tail

func (q *Queue[V]) Tail() V

Tail returns the element at the tail of the queue. This call panics if the queue is empty.

func (*Queue[V]) TailP

func (q *Queue[V]) TailP() *V

TailP returns the element at the tail of the queue. This call panics if the queue is empty.

type RPCHeader

type RPCHeader struct {
	EthHeader *ethtypes.Header
	Hash      common.Hash
}

type RPCStream

type RPCStream struct {
	// contains filtered or unexported fields
}

RPCStream provides data streams for newHeads, logs, and pendingTransactions.

func NewRPCStreams

func NewRPCStreams(
	evtClient rpcclient.EventsClient,
	logger log.Logger,
	txDecoder sdk.TxDecoder,
) (*RPCStream, error)

func (*RPCStream) Close

func (s *RPCStream) Close() error

func (*RPCStream) HeaderStream

func (s *RPCStream) HeaderStream() *Stream[RPCHeader]

func (*RPCStream) ListenPendingTx

func (s *RPCStream) ListenPendingTx(hash common.Hash)

ListenPendingTx is a callback passed to application to listen for pending transactions in CheckTx.

func (*RPCStream) LogStream

func (s *RPCStream) LogStream() *Stream[*ethtypes.Log]

func (*RPCStream) PendingTxStream

func (s *RPCStream) PendingTxStream() *Stream[common.Hash]

type Stream

type Stream[V any] struct {
	// contains filtered or unexported fields
}

Stream implements a data stream, user can subscribe the stream in blocking or non-blocking way using offsets. it use a segmented ring buffer to store the items, with a fixed capacity, when buffer is full, the old data get pruned when new data comes.

func NewStream

func NewStream[V any](segmentSize, capacity int) *Stream[V]

func (*Stream[V]) Add

func (s *Stream[V]) Add(vs ...V) int

Add appends items to the stream and returns the id of last one. item id start with 1.

func (*Stream[V]) ReadAllNonBlocking

func (s *Stream[V]) ReadAllNonBlocking(offset int) ([]V, int)

ReadAllNonBlocking returns all items in the stream, without blocking.

func (*Stream[V]) ReadBlocking

func (s *Stream[V]) ReadBlocking(ctx context.Context, offset int) ([]V, int)

ReadBlocking returns items with id greater than the last received id reported by user. reads at most one segment at a time. negative offset means read from the end. it also returns the largest id of the items, if there are no new items, returns the id of the last item.

func (*Stream[V]) ReadNonBlocking

func (s *Stream[V]) ReadNonBlocking(offset int) ([]V, int)

ReadNonBlocking returns items with id greater than the last received id reported by user, without blocking. if there are no new items, it also returns the largest id of the items.

func (*Stream[V]) Subscribe

func (s *Stream[V]) Subscribe(ctx context.Context, callback func([]V, int) error) error

Subscribe subscribes the stream in a loop, pass the chunks of items to the callback, it only stops if the context is canceled. it returns the last id of the items.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL