Documentation ¶
Overview ¶
Package events provides a generic client for subscribing to on-chain events via an EventsQueryClient and transforming the received events into the type defined by the EventsReplayClient's generic type parameter.
The EventsQueryClient emits ReplayObservables which are of the type defined by the EventsReplayClient's generic type parameter.
The usage of of ReplayObservables allows the EventsReplayClient to be always provide the latest event data to the caller, even if the connection to the EventsQueryClient is lost and re-established, without the caller having to re-subscribe to the EventsQueryClient.
Index ¶
- Constants
- Variables
- func NewEventsQueryClient(cometWebsocketURL string, opts ...client.EventsQueryClientOption) client.EventsQueryClient
- func NewEventsReplayClient[T any](ctx context.Context, deps depinject.Config, queryString string, ...) (client.EventsReplayClient[T], error)
- func RPCToWebsocketURL(hostUrl *url.URL) string
- func WithConnRetryLimit[T any](limit int) client.EventsReplayClientOption[T]
- func WithDialer(dialer client.Dialer) client.EventsQueryClientOption
- type NewEventsFn
Examples ¶
Constants ¶
const ( // DefaultConnRetryLimit is used to indicate how many times the // underlying replay client should attempt to retry if it encounters an error // or its connection is interrupted. // // TODO_IMPROVE: this should be configurable but can be overridden at compile-time: // go build -ldflags "-X github.com/pokt-network/poktroll/DefaultConnRetryLimit=value". DefaultConnRetryLimit = 10 )
Variables ¶
var ( ErrEventsDial = sdkerrors.Register(codespace, 1, "dialing for connection failed") ErrEventsConnClosed = sdkerrors.Register(codespace, 2, "connection closed") ErrEventsSubscribe = sdkerrors.Register(codespace, 3, "failed to subscribe to events") ErrEventsUnmarshalEvent = sdkerrors.Register(codespace, 4, "failed to unmarshal event bytes") ErrEventsConsClosed = sdkerrors.Register(codespace, 5, "eventsqueryclient connection closed") )
Functions ¶
func NewEventsQueryClient ¶
func NewEventsQueryClient(cometWebsocketURL string, opts ...client.EventsQueryClientOption) client.EventsQueryClient
NewEventsQueryClient returns a new events query client which is used to subscribe to on-chain events matching the given query.
Available options:
- WithDialer
func NewEventsReplayClient ¶
func NewEventsReplayClient[T any]( ctx context.Context, deps depinject.Config, queryString string, newEventFn NewEventsFn[T], replayObsBufferSize int, opts ...client.EventsReplayClientOption[T], ) (client.EventsReplayClient[T], error)
NewEventsReplayClient creates a new EventsReplayClient from the given dependencies, cometWebsocketURL and subscription query string. It requires a decoder function to be provided which decodes event subscription message bytes into the type defined by the EventsReplayClient's generic type parameter. The replayObsBufferSize is the replay buffer size of the replay observable which is notified of new events.
Required dependencies:
- client.EventsQueryClient
Example ¶
package main import ( "context" "encoding/json" "fmt" "cosmossdk.io/depinject" "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/polylog" ) const ( // Define a query string to provide to the EventsQueryClient // See: https://docs.cosmos.network/v0.47/learn/advanced/events#subscribing-to-events // And: https://docs.cosmos.network/v0.47/learn/advanced/events#default-events eventQueryString = "message.action='messageActionName'" // Define the websocket URL the EventsQueryClient will subscribe to cometWebsocketURL = "ws://example.com:26657/websocket" // the amount of events we want before they are emitted replayObsBufferSize = 1 ) var _ EventType = (*eventType)(nil) // Define an interface to represent the onchain event type EventType interface { GetName() string // Illustrative only; arbitrary interfaces are supported. } // Define the event type that implements the interface type eventType struct { Name string `json:"name"` } // See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsFn func eventTypeFactory(ctx context.Context) events.NewEventsFn[EventType] { // Define a decoder function that can take the raw event bytes // received from the EventsQueryClient and convert them into // the desired type for the EventsReplayClient return func(eventBz []byte) (EventType, error) { eventMsg := new(eventType) logger := polylog.Ctx(ctx) if err := json.Unmarshal(eventBz, eventMsg); err != nil { return nil, err } // Confirm the event is correct by checking its fields if eventMsg.Name == "" { logger.Error().Str("eventBz", string(eventBz)).Msg("event type is not correct") return nil, events.ErrEventsUnmarshalEvent. Wrapf("with eventType data: %s", string(eventBz)) } return eventMsg, nil } } func (e *eventType) GetName() string { return e.Name } func main() { // Create the events query client and a depinject config to supply // it into the EventsReplayClient // See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsQueryClient evtClient := events.NewEventsQueryClient(cometWebsocketURL) depConfig := depinject.Supply(evtClient) // Create a context (this should be cancellable to close the EventsReplayClient) ctx, cancel := context.WithCancel(context.Background()) // Create a new instance of the EventsReplayClient // See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsReplayClient client, err := events.NewEventsReplayClient[EventType]( ctx, depConfig, eventQueryString, eventTypeFactory(ctx), replayObsBufferSize, ) if err != nil { panic(fmt.Errorf("unable to create EventsReplayClient %v", err)) } // Assume events the lastest event emitted of type EventType has the name "testEvent" // Retrieve the latest emitted event lastEventType := client.LastNEvents(ctx, 1)[0] fmt.Printf("Last Event: '%s'\n", lastEventType.GetName()) // Get the latest replay observable from the EventsReplayClient // In order to get the latest events from the sequence latestEventsObs := client.EventsSequence(ctx) // Get the latest events from the sequence lastEventType = latestEventsObs.Last(ctx, 1)[0] fmt.Printf("Last Event: '%s'\n", lastEventType.GetName()) // Cancel the context which will call client.Close and close all // subscriptions and the EventsQueryClient cancel() // Output // Last Event: 'testEvent' // Last Event: 'testEvent' }
Output:
func RPCToWebsocketURL ¶ added in v0.0.3
RPCToWebsocketURL converts the provided URL into a websocket URL string that can be used to subscribe to onchain events and query the chain via a client context or send transactions via a tx client context.
func WithConnRetryLimit ¶
func WithConnRetryLimit[T any](limit int) client.EventsReplayClientOption[T]
WithConnRetryLimit returns an option function which sets the number of times the replay client should retry in the event that it encounters an error or its connection is interrupted. If connRetryLimit is < 0, it will retry indefinitely.
func WithDialer ¶
func WithDialer(dialer client.Dialer) client.EventsQueryClientOption
WithDialer returns a client.EventsQueryClientOption which sets the given dialer on the resulting eventsQueryClient when passed to NewEventsQueryClient().
Types ¶
type NewEventsFn ¶
NewEventsFn is a function that takes a byte slice and returns a new instance of the generic type T.
Directories ¶
Path | Synopsis |
---|---|
Package websocket provides a websocket client used to connect to a cosmos-sdk based chain node and subscribe to events via the EventsQueryClient.
|
Package websocket provides a websocket client used to connect to a cosmos-sdk based chain node and subscribe to events via the EventsQueryClient. |