event

package
v1.7.3-0...-f4d7a22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2023 License: GPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrMuxClosed = errors.New("event: mux closed")

Functions

This section is empty.

Types

type Feed

type Feed struct {
	// contains filtered or unexported fields
}
Example (AcknowledgedEvents)
package main

import (
	"fmt"

	"github.com/neatio-net/neatio/utilities/event"
)

func main() {

	var feed event.Feed
	type ackedEvent struct {
		i   int
		ack chan<- struct{}
	}

	done := make(chan struct{})
	defer close(done)
	for i := 0; i < 3; i++ {
		ch := make(chan ackedEvent, 100)
		sub := feed.Subscribe(ch)
		go func() {
			defer sub.Unsubscribe()
			for {
				select {
				case ev := <-ch:
					fmt.Println(ev.i)
					ev.ack <- struct{}{}
				case <-done:
					return
				}
			}
		}()
	}

	for i := 0; i < 3; i++ {
		acksignal := make(chan struct{})
		n := feed.Send(ackedEvent{i, acksignal})
		for ack := 0; ack < n; ack++ {
			<-acksignal
		}
	}

}
Output:

func (*Feed) Send

func (f *Feed) Send(value interface{}) (nsent int)

func (*Feed) Subscribe

func (f *Feed) Subscribe(channel interface{}) Subscription

type ResubscribeFunc

type ResubscribeFunc func(context.Context) (Subscription, error)

type Subscription

type Subscription interface {
	Err() <-chan error
	Unsubscribe()
}

func NewSubscription

func NewSubscription(producer func(<-chan struct{}) error) Subscription
Example
package main

import (
	"fmt"

	"github.com/neatio-net/neatio/utilities/event"
)

func main() {

	ch := make(chan int)
	sub := event.NewSubscription(func(quit <-chan struct{}) error {
		for i := 0; i < 10; i++ {
			select {
			case ch <- i:
			case <-quit:
				fmt.Println("unsubscribed")
				return nil
			}
		}
		return nil
	})

	for i := range ch {
		fmt.Println(i)
		if i == 4 {
			sub.Unsubscribe()
			break
		}
	}

}
Output:

func Resubscribe

func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription

type SubscriptionScope

type SubscriptionScope struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"fmt"
	"sync"

	"github.com/neatio-net/neatio/utilities/event"
)

type divServer struct{ results event.Feed }
type mulServer struct{ results event.Feed }

func (s *divServer) do(a, b int) int {
	r := a / b
	s.results.Send(r)
	return r
}

func (s *mulServer) do(a, b int) int {
	r := a * b
	s.results.Send(r)
	return r
}

type App struct {
	divServer
	mulServer
	scope event.SubscriptionScope
}

func (s *App) Calc(op byte, a, b int) int {
	switch op {
	case '/':
		return s.divServer.do(a, b)
	case '*':
		return s.mulServer.do(a, b)
	default:
		panic("invalid op")
	}
}

func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription {
	switch op {
	case '/':
		return s.scope.Track(s.divServer.results.Subscribe(ch))
	case '*':
		return s.scope.Track(s.mulServer.results.Subscribe(ch))
	default:
		panic("invalid op")
	}
}

func (s *App) Stop() {
	s.scope.Close()
}

func main() {

	var (
		app  App
		wg   sync.WaitGroup
		divs = make(chan int)
		muls = make(chan int)
	)

	divsub := app.SubscribeResults('/', divs)
	mulsub := app.SubscribeResults('*', muls)
	wg.Add(1)
	go func() {
		defer wg.Done()
		defer fmt.Println("subscriber exited")
		defer divsub.Unsubscribe()
		defer mulsub.Unsubscribe()
		for {
			select {
			case result := <-divs:
				fmt.Println("division happened:", result)
			case result := <-muls:
				fmt.Println("multiplication happened:", result)
			case <-divsub.Err():
				return
			case <-mulsub.Err():
				return
			}
		}
	}()

	app.Calc('/', 22, 11)
	app.Calc('*', 3, 4)

	app.Stop()
	wg.Wait()

}
Output:

func (*SubscriptionScope) Close

func (sc *SubscriptionScope) Close()

func (*SubscriptionScope) Count

func (sc *SubscriptionScope) Count() int

func (*SubscriptionScope) Track

type TypeMux

type TypeMux struct {
	// contains filtered or unexported fields
}
Example
type someEvent struct{ I int }
type otherEvent struct{ S string }
type yetAnotherEvent struct{ X, Y int }

var mux TypeMux

done := make(chan struct{})
sub := mux.Subscribe(someEvent{}, otherEvent{})
go func() {
	for event := range sub.Chan() {
		fmt.Printf("Received: %#v\n", event.Data)
	}
	fmt.Println("done")
	close(done)
}()

mux.Post(someEvent{5})
mux.Post(yetAnotherEvent{X: 3, Y: 4})
mux.Post(someEvent{6})
mux.Post(otherEvent{"whoa"})

mux.Stop()

<-done
Output:

func (*TypeMux) Post

func (mux *TypeMux) Post(ev interface{}) error

func (*TypeMux) Stop

func (mux *TypeMux) Stop()

func (*TypeMux) Subscribe

func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription

type TypeMuxEvent

type TypeMuxEvent struct {
	Time time.Time
	Data interface{}
}

type TypeMuxSubscription

type TypeMuxSubscription struct {
	// contains filtered or unexported fields
}

func (*TypeMuxSubscription) Chan

func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent

func (*TypeMuxSubscription) Closed

func (s *TypeMuxSubscription) Closed() bool

func (*TypeMuxSubscription) Unsubscribe

func (s *TypeMuxSubscription) Unsubscribe()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL