events

package
v0.0.11-rc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package events provides a generic client for subscribing to on-chain events via an EventsQueryClient and transforming the received events into the type defined by the EventsReplayClient's generic type parameter.

The EventsQueryClient emits ReplayObservables which are of the type defined by the EventsReplayClient's generic type parameter.

The usage of of ReplayObservables allows the EventsReplayClient to be always provide the latest event data to the caller, even if the connection to the EventsQueryClient is lost and re-established, without the caller having to re-subscribe to the EventsQueryClient.

Index

Examples

Constants

View Source
const (
	// DefaultConnRetryLimit is used to indicate how many times the
	// underlying replay client should attempt to retry if it encounters an error
	// or its connection is interrupted.
	//
	// TODO_IMPROVE: this should be configurable but can be overridden at compile-time:
	// go build -ldflags "-X github.com/pokt-network/poktroll/DefaultConnRetryLimit=value".
	DefaultConnRetryLimit = 10
)

Variables

View Source
var (
	ErrEventsDial           = sdkerrors.Register(codespace, 1, "dialing for connection failed")
	ErrEventsConnClosed     = sdkerrors.Register(codespace, 2, "connection closed")
	ErrEventsSubscribe      = sdkerrors.Register(codespace, 3, "failed to subscribe to events")
	ErrEventsUnmarshalEvent = sdkerrors.Register(codespace, 4, "failed to unmarshal event bytes")
	ErrEventsConsClosed     = sdkerrors.Register(codespace, 5, "eventsqueryclient connection closed")
)

Functions

func NewEventsQueryClient

func NewEventsQueryClient(cometWebsocketURL string, opts ...client.EventsQueryClientOption) client.EventsQueryClient

NewEventsQueryClient returns a new events query client which is used to subscribe to on-chain events matching the given query.

Available options:

  • WithDialer

func NewEventsReplayClient

func NewEventsReplayClient[T any](
	ctx context.Context,
	deps depinject.Config,
	queryString string,
	newEventFn NewEventsFn[T],
	replayObsBufferSize int,
	opts ...client.EventsReplayClientOption[T],
) (client.EventsReplayClient[T], error)

NewEventsReplayClient creates a new EventsReplayClient from the given dependencies, cometWebsocketURL and subscription query string. It requires a decoder function to be provided which decodes event subscription message bytes into the type defined by the EventsReplayClient's generic type parameter. The replayObsBufferSize is the replay buffer size of the replay observable which is notified of new events.

Required dependencies:

  • client.EventsQueryClient
Example
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"cosmossdk.io/depinject"

	"github.com/pokt-network/poktroll/pkg/client/events"
	"github.com/pokt-network/poktroll/pkg/polylog"
)

const (
	// Define a query string to provide to the EventsQueryClient
	// See: https://docs.cosmos.network/v0.47/learn/advanced/events#subscribing-to-events
	// And: https://docs.cosmos.network/v0.47/learn/advanced/events#default-events
	eventQueryString = "message.action='messageActionName'"
	// Define the websocket URL the EventsQueryClient will subscribe to
	cometWebsocketURL = "ws://example.com:26657/websocket"
	// the amount of events we want before they are emitted
	replayObsBufferSize = 1
)

var _ EventType = (*eventType)(nil)

// Define an interface to represent the onchain event
type EventType interface {
	GetName() string // Illustrative only; arbitrary interfaces are supported.
}

// Define the event type that implements the interface
type eventType struct {
	Name string `json:"name"`
}

// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsFn
func eventTypeFactory(ctx context.Context) events.NewEventsFn[EventType] {
	// Define a decoder function that can take the raw event bytes
	// received from the EventsQueryClient and convert them into
	// the desired type for the EventsReplayClient
	return func(eventBz []byte) (EventType, error) {
		eventMsg := new(eventType)
		logger := polylog.Ctx(ctx)

		if err := json.Unmarshal(eventBz, eventMsg); err != nil {
			return nil, err
		}

		// Confirm the event is correct by checking its fields
		if eventMsg.Name == "" {
			logger.Error().Str("eventBz", string(eventBz)).Msg("event type is not correct")
			return nil, events.ErrEventsUnmarshalEvent.
				Wrapf("with eventType data: %s", string(eventBz))
		}

		return eventMsg, nil
	}
}

func (e *eventType) GetName() string { return e.Name }

func main() {
	// Create the events query client and a depinject config to supply
	// it into the EventsReplayClient
	// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsQueryClient
	evtClient := events.NewEventsQueryClient(cometWebsocketURL)
	depConfig := depinject.Supply(evtClient)

	// Create a context (this should be cancellable to close the EventsReplayClient)
	ctx, cancel := context.WithCancel(context.Background())

	// Create a new instance of the EventsReplayClient
	// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsReplayClient
	client, err := events.NewEventsReplayClient[EventType](
		ctx,
		depConfig,
		eventQueryString,
		eventTypeFactory(ctx),
		replayObsBufferSize,
	)
	if err != nil {
		panic(fmt.Errorf("unable to create EventsReplayClient %v", err))
	}

	// Assume events the lastest event emitted of type EventType has the name "testEvent"

	// Retrieve the latest emitted event
	lastEventType := client.LastNEvents(ctx, 1)[0]
	fmt.Printf("Last Event: '%s'\n", lastEventType.GetName())

	// Get the latest replay observable from the EventsReplayClient
	// In order to get the latest events from the sequence
	latestEventsObs := client.EventsSequence(ctx)
	// Get the latest events from the sequence
	lastEventType = latestEventsObs.Last(ctx, 1)[0]
	fmt.Printf("Last Event: '%s'\n", lastEventType.GetName())

	// Cancel the context which will call client.Close and close all
	// subscriptions and the EventsQueryClient
	cancel()
	// Output
	// Last Event: 'testEvent'
	// Last Event: 'testEvent'
}
Output:

func RPCToWebsocketURL added in v0.0.3

func RPCToWebsocketURL(hostUrl *url.URL) string

RPCToWebsocketURL converts the provided URL into a websocket URL string that can be used to subscribe to onchain events and query the chain via a client context or send transactions via a tx client context.

func WithConnRetryLimit

func WithConnRetryLimit[T any](limit int) client.EventsReplayClientOption[T]

WithConnRetryLimit returns an option function which sets the number of times the replay client should retry in the event that it encounters an error or its connection is interrupted. If connRetryLimit is < 0, it will retry indefinitely.

func WithDialer

func WithDialer(dialer client.Dialer) client.EventsQueryClientOption

WithDialer returns a client.EventsQueryClientOption which sets the given dialer on the resulting eventsQueryClient when passed to NewEventsQueryClient().

Types

type NewEventsFn

type NewEventsFn[T any] func([]byte) (T, error)

NewEventsFn is a function that takes a byte slice and returns a new instance of the generic type T.

Directories

Path Synopsis
Package websocket provides a websocket client used to connect to a cosmos-sdk based chain node and subscribe to events via the EventsQueryClient.
Package websocket provides a websocket client used to connect to a cosmos-sdk based chain node and subscribe to events via the EventsQueryClient.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL