eventstream

package
v0.0.0-...-be384fd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package eventstream implements a convenience client for the Events method of the Tendermint RPC service, allowing clients to observe a resumable stream of events matching a query.

Index

Constants

This section is empty.

Variables

View Source
var ErrStopRunning = errors.New("stop accepting events")

ErrStopRunning is returned by a Run callback to signal that no more events are wanted and that Run should return.

Functions

This section is empty.

Types

type Client

type Client interface {
	Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error)
}

Client is the subset of the RPC client interface consumed by Stream.

type MissedItemsError

type MissedItemsError struct {
	// The cursor of the newest matching item the stream has observed.
	NewestSeen string

	// The oldest cursor in the log at the point the miss was detected.
	// Any matching events between NewestSeen and OldestPresent are lost.
	OldestPresent string

	// The active query.
	Query string
}

MissedItemsError is an error that indicates the stream missed (lost) some number of events matching the specified query.

func (*MissedItemsError) Error

func (e *MissedItemsError) Error() string

Error satisfies the error interface.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

A Stream cpatures the state of a streaming event subscription.

func New

func New(cli Client, query string, opts *StreamOptions) *Stream

New constructs a new stream for the given query and options. If opts == nil, the stream uses default values as described by StreamOptions. This function will panic if cli == nil.

func (*Stream) Reset

func (s *Stream) Reset()

Reset updates the stream's current cursor position to the head of the log. This method may safely be called only when Run is not executing.

func (*Stream) Run

func (s *Stream) Run(ctx context.Context, accept func(*coretypes.EventItem) error) error

Run polls the service for events matching the query, and calls accept for each such event. Run handles pagination transparently, and delivers events to accept in order of publication.

Run continues until ctx ends or accept reports an error. If accept returns ErrStopRunning, Run returns nil; otherwise Run returns the error reported by accept or ctx. Run also returns an error if the server reports an error from the Events method.

If the stream falls behind the event log on the server, Run will stop and report an error of concrete type *MissedItemsError. Call Reset to reset the stream to the head of the log, and call Run again to resume.

type StreamOptions

type StreamOptions struct {
	// How many items to request per call to the service.  The stream may pin
	// this value to a minimum default batch size.
	BatchSize int

	// If set, resume streaming from this cursor. Typically this is set to the
	// cursor of the most recently-received matching value. If empty, streaming
	// begins at the head of the log (the default).
	ResumeFrom string

	// Specifies the long poll interval. The stream may pin this value to a
	// minimum default poll interval.
	WaitTime time.Duration
}

StreamOptions are optional settings for a Stream value. A nil *StreamOptions is ready for use and provides default values as described.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL