Documentation ¶
Overview ¶
Package event contains an event feed implementation for process communication.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Feed ¶
type Feed struct {
// contains filtered or unexported fields
}
Feed implements one-to-many subscriptions where the carrier of events is a channel. Values sent to a Feed are delivered to all subscribed channels simultaneously.
Feeds can only be used with a single type. The type is determined by the first Send or Subscribe operation. Subsequent calls to these methods panic if the type does not match.
The zero value is ready to use.
Example (AcknowledgedEvents) ¶
package main import ( "fmt" "github.com/prysmaticlabs/prysm/v5/async/event" ) func main() { // This example shows how the return value of Send can be used for request/reply // interaction between event consumers and producers. var feed event.Feed type ackedEvent struct { i int ack chan<- struct{} } // Consumers wait for events on the feed and acknowledge processing. done := make(chan struct{}) defer close(done) for i := 0; i < 3; i++ { ch := make(chan ackedEvent, 100) sub := feed.Subscribe(ch) go func() { defer sub.Unsubscribe() for { select { case ev := <-ch: fmt.Println(ev.i) // "process" the event ev.ack <- struct{}{} case <-done: return } } }() } // The producer sends values of type ackedEvent with increasing values of i. // It waits for all consumers to acknowledge before sending the next event. for i := 0; i < 3; i++ { acksignal := make(chan struct{}) n := feed.Send(ackedEvent{i, acksignal}) for ack := 0; ack < n; ack++ { <-acksignal } } }
Output: 0 0 0 1 1 1 2 2 2
func (*Feed) Send ¶
Send delivers to all subscribed channels simultaneously. It returns the number of subscribers that the value was sent to.
func (*Feed) Subscribe ¶
func (f *Feed) Subscribe(channel interface{}) Subscription
Subscribe adds a channel to the feed. Future sends will be delivered on the channel until the subscription is canceled. All channels added must have the same element type.
The channel should have ample buffer space to avoid blocking other subscribers. Slow subscribers are not dropped.
type ResubscribeFunc ¶
type ResubscribeFunc func(context.Context) (Subscription, error)
A ResubscribeFunc attempts to establish a subscription.
type Subscription ¶
type Subscription interface { Err() <-chan error // returns the error channel Unsubscribe() // cancels sending of events, closing the error channel }
Subscription represents a stream of events. The carrier of the events is typically a channel, but isn't part of the interface.
Subscriptions can fail while established. Failures are reported through an error channel. It receives a value if there is an issue with the subscription (e.g. the network connection delivering the events has been closed). Only one value will ever be sent.
The error channel is closed when the subscription ends successfully (i.e. when the source of events is closed). It is also closed when Unsubscribe is called.
The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all cases to ensure that resources related to the subscription are released. It can be called any number of times.
func NewSubscription ¶
func NewSubscription(producer func(<-chan struct{}) error) Subscription
NewSubscription runs a producer function as a subscription in a new goroutine. The channel given to the producer is closed when Unsubscribe is called. If fn returns an error, it is sent on the subscription's error channel.
Example ¶
package main import ( "fmt" "github.com/prysmaticlabs/prysm/v5/async/event" ) func main() { // Create a subscription that sends 10 integers on ch. ch := make(chan int) sub := event.NewSubscription(func(quit <-chan struct{}) error { for i := 0; i < 10; i++ { select { case ch <- i: case <-quit: fmt.Println("unsubscribed") return nil } } return nil }) // This is the consumer. It reads 5 integers, then aborts the subscription. // Note that Unsubscribe waits until the producer has shut down. for i := range ch { fmt.Println(i) if i == 4 { sub.Unsubscribe() break } } }
Output: 0 1 2 3 4 unsubscribed
func Resubscribe ¶
func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription
Resubscribe calls fn repeatedly to keep a subscription established. When the subscription is established, Resubscribe waits for it to fail and calls fn again. This process repeats until Unsubscribe is called or the active subscription ends successfully.
Resubscribe applies backoff between calls to fn. The time between calls is adapted based on the error rate, but will never exceed backoffMax.
type SubscriptionScope ¶
type SubscriptionScope struct {
// contains filtered or unexported fields
}
SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
For code that handle more than one subscription, a scope can be used to conveniently unsubscribe all of them with a single call. The example demonstrates a typical use in a larger program.
The zero value is ready to use.
Example ¶
package main import ( "fmt" "sync" "github.com/prysmaticlabs/prysm/v5/async/event" ) // This example demonstrates how SubscriptionScope can be used to control the lifetime of // subscriptions. // // Our example program consists of two servers, each of which performs a calculation when // requested. The servers also allow subscribing to results of all computations. type divServer struct{ results event.Feed } type mulServer struct{ results event.Feed } func (s *divServer) do(a, b int) int { r := a / b s.results.Send(r) return r } func (s *mulServer) do(a, b int) int { r := a * b s.results.Send(r) return r } // The servers are contained in an App. The app controls the servers and exposes them // through its API. type App struct { divServer mulServer scope event.SubscriptionScope } func (s *App) Calc(op byte, a, b int) int { switch op { case '/': return s.divServer.do(a, b) case '*': return s.mulServer.do(a, b) default: panic("invalid op") } } // The app's SubscribeResults method starts sending calculation results to the given // channel. Subscriptions created through this method are tied to the lifetime of the App // because they are registered in the scope. func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription { switch op { case '/': return s.scope.Track(s.divServer.results.Subscribe(ch)) case '*': return s.scope.Track(s.mulServer.results.Subscribe(ch)) default: panic("invalid op") } } // Stop stops the App, closing all subscriptions created through SubscribeResults. func (s *App) Stop() { s.scope.Close() } func main() { // Create the app. var ( app App wg sync.WaitGroup divs = make(chan int) muls = make(chan int) ) // Run a subscriber in the background. divsub := app.SubscribeResults('/', divs) mulsub := app.SubscribeResults('*', muls) wg.Add(1) go func() { defer wg.Done() defer fmt.Println("subscriber exited") defer divsub.Unsubscribe() defer mulsub.Unsubscribe() for { select { case result := <-divs: fmt.Println("division happened:", result) case result := <-muls: fmt.Println("multiplication happened:", result) case <-divsub.Err(): return case <-mulsub.Err(): return } } }() // Interact with the app. app.Calc('/', 22, 11) app.Calc('*', 3, 4) // Stop the app. This shuts down the subscriptions, causing the subscriber to exit. app.Stop() wg.Wait() }
Output: division happened: 2 multiplication happened: 12 subscriber exited
func (*SubscriptionScope) Close ¶
func (sc *SubscriptionScope) Close()
Close calls Unsubscribe on all tracked subscriptions and prevents further additions to the tracked set. Calls to Track after Close return nil.
func (*SubscriptionScope) Count ¶
func (sc *SubscriptionScope) Count() int
Count returns the number of tracked subscriptions. It is meant to be used for debugging.
func (*SubscriptionScope) Track ¶
func (sc *SubscriptionScope) Track(s Subscription) Subscription
Track starts tracking a subscription. If the scope is closed, Track returns nil. The returned subscription is a wrapper. Unsubscribing the wrapper removes it from the scope.