Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrMuxClosed = errors.New("event: mux closed")
Functions ¶
This section is empty.
Types ¶
type Feed ¶
type Feed struct {
// contains filtered or unexported fields
}
Example (AcknowledgedEvents) ¶
package main import ( "fmt" "github.com/neatio-net/neatio/utilities/event" ) func main() { var feed event.Feed type ackedEvent struct { i int ack chan<- struct{} } done := make(chan struct{}) defer close(done) for i := 0; i < 3; i++ { ch := make(chan ackedEvent, 100) sub := feed.Subscribe(ch) go func() { defer sub.Unsubscribe() for { select { case ev := <-ch: fmt.Println(ev.i) ev.ack <- struct{}{} case <-done: return } } }() } for i := 0; i < 3; i++ { acksignal := make(chan struct{}) n := feed.Send(ackedEvent{i, acksignal}) for ack := 0; ack < n; ack++ { <-acksignal } } }
Output:
func (*Feed) Subscribe ¶
func (f *Feed) Subscribe(channel interface{}) Subscription
type ResubscribeFunc ¶
type ResubscribeFunc func(context.Context) (Subscription, error)
type Subscription ¶
type Subscription interface { Err() <-chan error Unsubscribe() }
func NewSubscription ¶
func NewSubscription(producer func(<-chan struct{}) error) Subscription
Example ¶
package main import ( "fmt" "github.com/neatio-net/neatio/utilities/event" ) func main() { ch := make(chan int) sub := event.NewSubscription(func(quit <-chan struct{}) error { for i := 0; i < 10; i++ { select { case ch <- i: case <-quit: fmt.Println("unsubscribed") return nil } } return nil }) for i := range ch { fmt.Println(i) if i == 4 { sub.Unsubscribe() break } } }
Output:
func Resubscribe ¶
func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription
type SubscriptionScope ¶
type SubscriptionScope struct {
// contains filtered or unexported fields
}
Example ¶
package main import ( "fmt" "sync" "github.com/neatio-net/neatio/utilities/event" ) type divServer struct{ results event.Feed } type mulServer struct{ results event.Feed } func (s *divServer) do(a, b int) int { r := a / b s.results.Send(r) return r } func (s *mulServer) do(a, b int) int { r := a * b s.results.Send(r) return r } type App struct { divServer mulServer scope event.SubscriptionScope } func (s *App) Calc(op byte, a, b int) int { switch op { case '/': return s.divServer.do(a, b) case '*': return s.mulServer.do(a, b) default: panic("invalid op") } } func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription { switch op { case '/': return s.scope.Track(s.divServer.results.Subscribe(ch)) case '*': return s.scope.Track(s.mulServer.results.Subscribe(ch)) default: panic("invalid op") } } func (s *App) Stop() { s.scope.Close() } func main() { var ( app App wg sync.WaitGroup divs = make(chan int) muls = make(chan int) ) divsub := app.SubscribeResults('/', divs) mulsub := app.SubscribeResults('*', muls) wg.Add(1) go func() { defer wg.Done() defer fmt.Println("subscriber exited") defer divsub.Unsubscribe() defer mulsub.Unsubscribe() for { select { case result := <-divs: fmt.Println("division happened:", result) case result := <-muls: fmt.Println("multiplication happened:", result) case <-divsub.Err(): return case <-mulsub.Err(): return } } }() app.Calc('/', 22, 11) app.Calc('*', 3, 4) app.Stop() wg.Wait() }
Output:
func (*SubscriptionScope) Close ¶
func (sc *SubscriptionScope) Close()
func (*SubscriptionScope) Count ¶
func (sc *SubscriptionScope) Count() int
func (*SubscriptionScope) Track ¶
func (sc *SubscriptionScope) Track(s Subscription) Subscription
type TypeMux ¶
type TypeMux struct {
// contains filtered or unexported fields
}
Example ¶
type someEvent struct{ I int } type otherEvent struct{ S string } type yetAnotherEvent struct{ X, Y int } var mux TypeMux done := make(chan struct{}) sub := mux.Subscribe(someEvent{}, otherEvent{}) go func() { for event := range sub.Chan() { fmt.Printf("Received: %#v\n", event.Data) } fmt.Println("done") close(done) }() mux.Post(someEvent{5}) mux.Post(yetAnotherEvent{X: 3, Y: 4}) mux.Post(someEvent{6}) mux.Post(otherEvent{"whoa"}) mux.Stop() <-done
Output:
func (*TypeMux) Subscribe ¶
func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription
type TypeMuxEvent ¶
type TypeMuxSubscription ¶
type TypeMuxSubscription struct {
// contains filtered or unexported fields
}
func (*TypeMuxSubscription) Chan ¶
func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent
func (*TypeMuxSubscription) Closed ¶
func (s *TypeMuxSubscription) Closed() bool
func (*TypeMuxSubscription) Unsubscribe ¶
func (s *TypeMuxSubscription) Unsubscribe()
Click to show internal directories.
Click to hide internal directories.