conc

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	InvalidHubClientError = errors.New("A hub client must have either a reader or a writer or both")
)

Functions

This section is empty.

Types

type Broadcaster

type Broadcaster[M any] struct {
	// contains filtered or unexported fields
}

*

  • A type of router that forwards a message to *all* its
  • connected clients (except the one it originated from).

func NewBroadcaster

func NewBroadcaster[M any]() *Broadcaster[M]

func (*Broadcaster[M]) Add

func (r *Broadcaster[M]) Add(client *HubClient[M]) error

func (*Broadcaster[M]) AddRoute

func (r *Broadcaster[M]) AddRoute(client *HubClient[M], topicIds ...TopicIdType) error

func (*Broadcaster[M]) Remove

func (r *Broadcaster[M]) Remove(client *HubClient[M]) error

func (*Broadcaster[M]) RemoveRoute

func (r *Broadcaster[M]) RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error

func (*Broadcaster[M]) RouteMessage

func (r *Broadcaster[M]) RouteMessage(msg Message[M], source *HubClient[M]) error

type Connector

type Connector[M any] struct {

	// Called when a new message is received
	OnMessage func(msg Message[M]) error

	// Called when the connection is closed and quit (wont be called on reconnects)
	OnClose func()
	// contains filtered or unexported fields
}

func NewConnector

func NewConnector[M any](connect func() error, read ReaderFunc[M]) *Connector[M]

*

  • Encapsulates a connection object which connects and continuously
  • reads out of it. This connection also has other facilities like
  • reconnecting on failures or closes (by recalling the connect
  • method) with retries and backoffs etc.

func (*Connector[M]) Start

func (c *Connector[M]) Start()

func (*Connector[M]) Stop

func (c *Connector[M]) Stop()

type Hub

type Hub[M any] struct {
	// contains filtered or unexported fields
}

func NewHub

func NewHub[M any](router Router[M]) *Hub[M]

func (*Hub[M]) Connect

func (h *Hub[M]) Connect(reader HubReader[M], writer HubWriter[M]) (*HubClient[M], error)

func (*Hub[M]) Send

func (s *Hub[M]) Send(message Message[M], callbackChan chan M) error

func (*Hub[M]) Stop

func (s *Hub[M]) Stop()

*

  • Stop the Hub and cleanup.

type HubClient

type HubClient[M any] struct {
	Write   HubWriter[M]
	Read    HubReader[M]
	OnClose func()
	// contains filtered or unexported fields
}

func (*HubClient[M]) Disconnect

func (h *HubClient[M]) Disconnect()

func (*HubClient[M]) GetId

func (h *HubClient[M]) GetId() string

func (*HubClient[M]) Subscribe

func (h *HubClient[M]) Subscribe(topicIds ...TopicIdType)

func (*HubClient[M]) Unsubscribe

func (h *HubClient[M]) Unsubscribe(topicIds ...TopicIdType)

type HubControlEvent

type HubControlEvent[M any] struct {
	Client               *HubClient[M]
	Quit                 bool
	Pause                bool
	AddedSubscriptions   []TopicIdType
	RemovedSubscriptions []TopicIdType
}

type HubMessage

type HubMessage[M any] struct {
	Message  Message[M]
	Callback chan M
}

type HubReader

type HubReader[M any] func() Message[M]

A function for reading from a hub

type HubWriter

type HubWriter[M any] func(Message[M]) error

A function that can write to a particular hub

type KVRouter

type KVRouter[M any] struct {
	// contains filtered or unexported fields
}

*

  • A type of router that maintains an topicId -> Conn[] by
  • a fixed topics.

func NewKVRouter

func NewKVRouter[M any](msgToKey func(msg M) TopicIdType) *KVRouter[M]

func (*KVRouter[M]) Add

func (r *KVRouter[M]) Add(client *HubClient[M]) error

func (*KVRouter[M]) AddRoute

func (r *KVRouter[M]) AddRoute(client *HubClient[M], topicIds ...TopicIdType) error

func (*KVRouter[M]) Remove

func (r *KVRouter[M]) Remove(client *HubClient[M]) error

func (*KVRouter[M]) RemoveRoute

func (r *KVRouter[M]) RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error

func (*KVRouter[M]) RouteMessage

func (r *KVRouter[M]) RouteMessage(msg Message[M], source *HubClient[M]) error

type Router

type Router[M any] interface {
	// Called when a new client has joined the group
	Add(client *HubClient[M]) error

	// Called when a client has dropped off
	Remove(client *HubClient[M]) error

	AddRoute(client *HubClient[M], topicIds ...TopicIdType) error
	RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error
	RouteMessage(msg Message[M], source *HubClient[M]) error
}

*

  • In our hub the main problem is given a message, identifying
  • all clients it should be sent to. Instead of having a fixed
  • key -> HubClient[] mapping, it is easier to simply for each
  • message use a router delegate to identify all HubClients that
  • are candidates. This Router does that.

type TopicIdType

type TopicIdType interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL