event

package
v5.1.2-mekong Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2024 License: GPL-3.0 Imports: 5 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Feed

type Feed = geth_event.Feed

Feed is a re-export of the go-ethereum event feed.

type ResubscribeFunc

type ResubscribeFunc func(context.Context) (Subscription, error)

A ResubscribeFunc attempts to establish a subscription.

type SubscriberSender added in v5.1.1

type SubscriberSender interface {
	Subscribe(channel interface{}) Subscription
	Send(value interface{}) (nsent int)
}

SubscriberSender is an abstract representation of an *event.Feed to use in describing types that accept or return an *event.Feed.

type Subscription

type Subscription = geth_event.Subscription

func NewSubscription

func NewSubscription(producer func(<-chan struct{}) error) Subscription

NewSubscription runs a producer function as a subscription in a new goroutine. The channel given to the producer is closed when Unsubscribe is called. If fn returns an error, it is sent on the subscription's error channel.

Example
package main

import (
	"fmt"

	"github.com/prysmaticlabs/prysm/v5/async/event"
)

func main() {
	// Create a subscription that sends 10 integers on ch.
	ch := make(chan int)
	sub := event.NewSubscription(func(quit <-chan struct{}) error {
		for i := 0; i < 10; i++ {
			select {
			case ch <- i:
			case <-quit:
				fmt.Println("unsubscribed")
				return nil
			}
		}
		return nil
	})

	// This is the consumer. It reads 5 integers, then aborts the subscription.
	// Note that Unsubscribe waits until the producer has shut down.
	for i := range ch {
		fmt.Println(i)
		if i == 4 {
			sub.Unsubscribe()
			break
		}
	}
}
Output:

0
1
2
3
4
unsubscribed

func Resubscribe

func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription

Resubscribe calls fn repeatedly to keep a subscription established. When the subscription is established, Resubscribe waits for it to fail and calls fn again. This process repeats until Unsubscribe is called or the active subscription ends successfully.

Resubscribe applies backoff between calls to fn. The time between calls is adapted based on the error rate, but will never exceed backoffMax.

type SubscriptionScope

type SubscriptionScope struct {
	// contains filtered or unexported fields
}

SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.

For code that handle more than one subscription, a scope can be used to conveniently unsubscribe all of them with a single call. The example demonstrates a typical use in a larger program.

The zero value is ready to use.

Example
package main

import (
	"fmt"
	"sync"

	"github.com/prysmaticlabs/prysm/v5/async/event"
)

// This example demonstrates how SubscriptionScope can be used to control the lifetime of
// subscriptions.
//
// Our example program consists of two servers, each of which performs a calculation when
// requested. The servers also allow subscribing to results of all computations.
type divServer struct{ results event.Feed }
type mulServer struct{ results event.Feed }

func (s *divServer) do(a, b int) int {
	r := a / b
	s.results.Send(r)
	return r
}

func (s *mulServer) do(a, b int) int {
	r := a * b
	s.results.Send(r)
	return r
}

// The servers are contained in an App. The app controls the servers and exposes them
// through its API.
type App struct {
	divServer
	mulServer
	scope event.SubscriptionScope
}

func (s *App) Calc(op byte, a, b int) int {
	switch op {
	case '/':
		return s.divServer.do(a, b)
	case '*':
		return s.mulServer.do(a, b)
	default:
		panic("invalid op")
	}
}

// The app's SubscribeResults method starts sending calculation results to the given
// channel. Subscriptions created through this method are tied to the lifetime of the App
// because they are registered in the scope.
func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription {
	switch op {
	case '/':
		return s.scope.Track(s.divServer.results.Subscribe(ch))
	case '*':
		return s.scope.Track(s.mulServer.results.Subscribe(ch))
	default:
		panic("invalid op")
	}
}

// Stop stops the App, closing all subscriptions created through SubscribeResults.
func (s *App) Stop() {
	s.scope.Close()
}

func main() {
	// Create the app.
	var (
		app  App
		wg   sync.WaitGroup
		divs = make(chan int)
		muls = make(chan int)
	)

	// Run a subscriber in the background.
	divsub := app.SubscribeResults('/', divs)
	mulsub := app.SubscribeResults('*', muls)
	wg.Add(1)
	go func() {
		defer wg.Done()
		defer fmt.Println("subscriber exited")
		defer divsub.Unsubscribe()
		defer mulsub.Unsubscribe()
		for {
			select {
			case result := <-divs:
				fmt.Println("division happened:", result)
			case result := <-muls:
				fmt.Println("multiplication happened:", result)
			case <-divsub.Err():
				return
			case <-mulsub.Err():
				return
			}
		}
	}()

	// Interact with the app.
	app.Calc('/', 22, 11)
	app.Calc('*', 3, 4)

	// Stop the app. This shuts down the subscriptions, causing the subscriber to exit.
	app.Stop()
	wg.Wait()

}
Output:

division happened: 2
multiplication happened: 12
subscriber exited

func (*SubscriptionScope) Close

func (sc *SubscriptionScope) Close()

Close calls Unsubscribe on all tracked subscriptions and prevents further additions to the tracked set. Calls to Track after Close return nil.

func (*SubscriptionScope) Count

func (sc *SubscriptionScope) Count() int

Count returns the number of tracked subscriptions. It is meant to be used for debugging.

func (*SubscriptionScope) Track

Track starts tracking a subscription. If the scope is closed, Track returns nil. The returned subscription is a wrapper. Unsubscribing the wrapper removes it from the scope.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL