Documentation
¶
Index ¶
- Variables
- func IDFunc[T any](input T) T
- type Batcher
- type BatcherCmd
- type Broadcaster
- func (r *Broadcaster[M]) Add(client *HubClient[M]) error
- func (r *Broadcaster[M]) AddRoute(client *HubClient[M], topicIds ...TopicIdType) error
- func (r *Broadcaster[M]) Remove(client *HubClient[M]) error
- func (r *Broadcaster[M]) RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error
- func (r *Broadcaster[M]) RouteMessage(msg Message[M], source *HubClient[M]) error
- type Connector
- type FanIn
- type FanInCmd
- type FanOut
- type FanOutCmd
- type FilterFunc
- type Hub
- type HubClient
- type HubControlEvent
- type HubMessage
- type HubReader
- type HubWriter
- type Input
- type KVRouter
- func (r *KVRouter[M]) Add(client *HubClient[M]) error
- func (r *KVRouter[M]) AddRoute(client *HubClient[M], topicIds ...TopicIdType) error
- func (r *KVRouter[M]) Remove(client *HubClient[M]) error
- func (r *KVRouter[M]) RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error
- func (r *KVRouter[M]) RouteMessage(msg Message[M], source *HubClient[M]) error
- type Message
- type Pipe
- type Reader
- type ReaderFunc
- type Router
- type RunnerBase
- type TopicIdType
- type Writer
- type WriterFunc
Constants ¶
This section is empty.
Variables ¶
View Source
var (
InvalidHubClientError = errors.New("A hub client must have either a reader or a writer or both")
)
Functions ¶
Types ¶
type Batcher ¶ added in v0.0.20
type Batcher[T any, U any] struct { FlushPeriod time.Duration Joiner func(inputs []T) (outputs U) // contains filtered or unexported fields }
func NewBatcher ¶ added in v0.0.20
func NewIDBatcher ¶ added in v0.0.20
func (*Batcher[T, U]) InputChannel ¶ added in v0.0.20
func (fo *Batcher[T, U]) InputChannel() chan<- T
type BatcherCmd ¶ added in v0.0.21
type Broadcaster ¶ added in v0.0.33
type Broadcaster[M any] struct { // contains filtered or unexported fields }
*
- A type of router that forwards a message to *all* its
- connected clients (except the one it originated from).
func NewBroadcaster ¶ added in v0.0.33
func NewBroadcaster[M any]() *Broadcaster[M]
func (*Broadcaster[M]) Add ¶ added in v0.0.33
func (r *Broadcaster[M]) Add(client *HubClient[M]) error
func (*Broadcaster[M]) AddRoute ¶ added in v0.0.33
func (r *Broadcaster[M]) AddRoute(client *HubClient[M], topicIds ...TopicIdType) error
func (*Broadcaster[M]) Remove ¶ added in v0.0.33
func (r *Broadcaster[M]) Remove(client *HubClient[M]) error
func (*Broadcaster[M]) RemoveRoute ¶ added in v0.0.33
func (r *Broadcaster[M]) RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error
func (*Broadcaster[M]) RouteMessage ¶ added in v0.0.33
func (r *Broadcaster[M]) RouteMessage(msg Message[M], source *HubClient[M]) error
type Connector ¶ added in v0.0.37
type Connector[M any] struct { // Called when a new message is received OnMessage func(msg Message[M]) error // Called when the connection is closed and quit (wont be called on reconnects) OnClose func() // contains filtered or unexported fields }
func NewConnector ¶ added in v0.0.37
func NewConnector[M any](connect func() error, read ReaderFunc[M]) *Connector[M]
*
- Encapsulates a connection object which connects and continuously
- reads out of it. This connection also has other facilities like
- reconnecting on failures or closes (by recalling the connect
- method) with retries and backoffs etc.
type FanIn ¶
type FanIn[T any] struct { RunnerBase[FanInCmd[T]] // Called when a channel is removed so the caller can // perform other cleanups etc based on this OnChannelRemoved func(fi *FanIn[T], inchan <-chan T) // contains filtered or unexported fields }
type FanOut ¶
type FanOut[T any, U any] struct { RunnerBase[FanOutCmd[T, U]] Mapper func(inputs T) U // contains filtered or unexported fields }
*
- FanOut takes a message from one chanel, applies a mapper function
- and fans it out to N output channels.
type FanOutCmd ¶ added in v0.0.21
type FanOutCmd[T any, U any] struct { Name string Filter FilterFunc[T] SelfOwned bool AddedChannel chan<- U RemovedChannel chan<- U }
type FilterFunc ¶ added in v0.0.33
type Hub ¶ added in v0.0.30
type Hub[M any] struct { // contains filtered or unexported fields }
type HubClient ¶ added in v0.0.30
type HubClient[M any] struct { Write HubWriter[M] Read HubReader[M] OnClose func() // contains filtered or unexported fields }
func (*HubClient[M]) Disconnect ¶ added in v0.0.30
func (h *HubClient[M]) Disconnect()
func (*HubClient[M]) Subscribe ¶ added in v0.0.30
func (h *HubClient[M]) Subscribe(topicIds ...TopicIdType)
func (*HubClient[M]) Unsubscribe ¶ added in v0.0.30
func (h *HubClient[M]) Unsubscribe(topicIds ...TopicIdType)
type HubControlEvent ¶ added in v0.0.30
type HubControlEvent[M any] struct { Client *HubClient[M] Quit bool Pause bool AddedSubscriptions []TopicIdType RemovedSubscriptions []TopicIdType }
type HubMessage ¶ added in v0.0.30
type Input ¶ added in v0.0.37
type Input[T any] struct { // contains filtered or unexported fields }
type KVRouter ¶ added in v0.0.30
type KVRouter[M any] struct { // contains filtered or unexported fields }
*
- A type of router that maintains an topicId -> Conn[] by
- a fixed topics.
func NewKVRouter ¶ added in v0.0.30
func NewKVRouter[M any](msgToKey func(msg M) TopicIdType) *KVRouter[M]
func (*KVRouter[M]) AddRoute ¶ added in v0.0.30
func (r *KVRouter[M]) AddRoute(client *HubClient[M], topicIds ...TopicIdType) error
func (*KVRouter[M]) RemoveRoute ¶ added in v0.0.30
func (r *KVRouter[M]) RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error
type Pipe ¶
type Pipe[T any] struct { RunnerBase[string] OnDone func(p *Pipe[T]) // contains filtered or unexported fields }
type Reader ¶ added in v0.0.37
type Reader[R any] struct { RunnerBase[string] Read ReaderFunc[R] OnDone func(r *Reader[R]) // contains filtered or unexported fields }
func NewReader ¶
func NewReader[R any](read ReaderFunc[R]) *Reader[R]
type ReaderFunc ¶ added in v0.0.37
type Router ¶ added in v0.0.30
type Router[M any] interface { // Called when a new client has joined the group Add(client *HubClient[M]) error // Called when a client has dropped off Remove(client *HubClient[M]) error AddRoute(client *HubClient[M], topicIds ...TopicIdType) error RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error RouteMessage(msg Message[M], source *HubClient[M]) error }
*
- In our hub the main problem is given a message, identifying
- all clients it should be sent to. Instead of having a fixed
- key -> HubClient[] mapping, it is easier to simply for each
- message use a router delegate to identify all HubClients that
- are candidates. This Router does that.
type RunnerBase ¶ added in v0.0.37
type RunnerBase[C any] struct { // contains filtered or unexported fields }
func NewRunnerBase ¶ added in v0.0.37
func NewRunnerBase[C any](stopVal C) RunnerBase[C]
func (*RunnerBase[C]) IsRunning ¶ added in v0.0.37
func (r *RunnerBase[C]) IsRunning() bool
func (*RunnerBase[C]) Stop ¶ added in v0.0.37
func (r *RunnerBase[C]) Stop() error
*
- This method is called to stop the runner. It is upto the child classes
- to listen to messages on the control channel and initiate the wind-down
- and cleanup process.
type TopicIdType ¶ added in v0.0.36
type TopicIdType interface{}
type Writer ¶ added in v0.0.37
type Writer[W any] struct { RunnerBase[string] Write WriterFunc[W] // contains filtered or unexported fields }
func NewWriter ¶
func NewWriter[W any](write WriterFunc[W]) *Writer[W]
type WriterFunc ¶ added in v0.0.37
Click to show internal directories.
Click to hide internal directories.