conc

package
v0.0.47 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	InvalidHubClientError = errors.New("A hub client must have either a reader or a writer or both")
)

Functions

func IDFunc added in v0.0.19

func IDFunc[T any](input T) T

Types

type Batcher added in v0.0.20

type Batcher[T any, U any] struct {
	FlushPeriod time.Duration
	Joiner      func(inputs []T) (outputs U)
	// contains filtered or unexported fields
}

func NewBatcher added in v0.0.20

func NewBatcher[T any, U any](inputChan chan T) *Batcher[T, U]

func NewIDBatcher added in v0.0.20

func NewIDBatcher[T any](inputChan chan T) *Batcher[T, []T]

func (*Batcher[T, U]) InputChannel added in v0.0.20

func (fo *Batcher[T, U]) InputChannel() chan<- T

func (*Batcher[T, U]) New added in v0.0.20

func (fo *Batcher[T, U]) New() chan U

func (*Batcher[T, U]) Remove added in v0.0.20

func (fo *Batcher[T, U]) Remove(output chan U)

func (*Batcher[T, U]) Send added in v0.0.20

func (fo *Batcher[T, U]) Send(value T)

func (*Batcher[T, U]) Stop added in v0.0.20

func (fo *Batcher[T, U]) Stop()

type BatcherCmd added in v0.0.21

type BatcherCmd[T any] struct {
	Name    string
	Channel chan T
}

type Broadcaster added in v0.0.33

type Broadcaster[M any] struct {
	// contains filtered or unexported fields
}

*

  • A type of router that forwards a message to *all* its
  • connected clients (except the one it originated from).

func NewBroadcaster added in v0.0.33

func NewBroadcaster[M any]() *Broadcaster[M]

func (*Broadcaster[M]) Add added in v0.0.33

func (r *Broadcaster[M]) Add(client *HubClient[M]) error

func (*Broadcaster[M]) AddRoute added in v0.0.33

func (r *Broadcaster[M]) AddRoute(client *HubClient[M], topicIds ...TopicIdType) error

func (*Broadcaster[M]) Remove added in v0.0.33

func (r *Broadcaster[M]) Remove(client *HubClient[M]) error

func (*Broadcaster[M]) RemoveRoute added in v0.0.33

func (r *Broadcaster[M]) RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error

func (*Broadcaster[M]) RouteMessage added in v0.0.33

func (r *Broadcaster[M]) RouteMessage(msg Message[M], source *HubClient[M]) error

type Connector added in v0.0.37

type Connector[M any] struct {

	// Called when a new message is received
	OnMessage func(msg Message[M]) error

	// Called when the connection is closed and quit (wont be called on reconnects)
	OnClose func()
	// contains filtered or unexported fields
}

func NewConnector added in v0.0.37

func NewConnector[M any](connect func() error, read ReaderFunc[M]) *Connector[M]

*

  • Encapsulates a connection object which connects and continuously
  • reads out of it. This connection also has other facilities like
  • reconnecting on failures or closes (by recalling the connect
  • method) with retries and backoffs etc.

func (*Connector[M]) Start added in v0.0.37

func (c *Connector[M]) Start()

func (*Connector[M]) Stop added in v0.0.37

func (c *Connector[M]) Stop()

type FanIn

type FanIn[T any] struct {
	RunnerBase[FanInCmd[T]]
	// Called when a channel is removed so the caller can
	// perform other cleanups etc based on this
	OnChannelRemoved func(fi *FanIn[T], inchan <-chan T)
	// contains filtered or unexported fields
}

func NewFanIn added in v0.0.19

func NewFanIn[T any](outChan chan T) *FanIn[T]

func (*FanIn[T]) Add added in v0.0.19

func (fi *FanIn[T]) Add(inputs ...<-chan T)

func (*FanIn[T]) Count added in v0.0.37

func (fi *FanIn[T]) Count() int

func (*FanIn[T]) RecvChan added in v0.0.37

func (fi *FanIn[T]) RecvChan() chan T

func (*FanIn[T]) Remove added in v0.0.19

func (fi *FanIn[T]) Remove(target <-chan T)

*

  • Remove an input channel from our monitor list.

type FanInCmd added in v0.0.21

type FanInCmd[T any] struct {
	Name           string
	AddedChannel   <-chan T
	RemovedChannel <-chan T
}

type FanOut

type FanOut[T any, U any] struct {
	RunnerBase[FanOutCmd[T, U]]
	Mapper func(inputs T) U
	// contains filtered or unexported fields
}

*

  • FanOut takes a message from one chanel, applies a mapper function
  • and fans it out to N output channels.

func NewFanOut

func NewFanOut[T any, U any](inputChan chan T, mapper func(T) U) *FanOut[T, U]

func NewIDFanOut added in v0.0.20

func NewIDFanOut[T any](input chan T, mapper func(T) T) *FanOut[T, T]

*

  • Creates a new IDFanout bridge.

func (*FanOut[T, U]) Add added in v0.0.37

func (fo *FanOut[T, U]) Add(output chan<- U, filter func(T) bool)

*

  • Adds a new fan out receiver with an optional filter method.
  • The filter method can be used to filter out message to certain
  • listeners if necessary.

func (*FanOut[T, U]) Count added in v0.0.37

func (fo *FanOut[T, U]) Count() int

func (*FanOut[T, U]) New added in v0.0.19

func (fo *FanOut[T, U]) New(filter func(T) bool) chan U

func (*FanOut[T, U]) Remove added in v0.0.19

func (fo *FanOut[T, U]) Remove(output chan<- U)

func (*FanOut[T, U]) Send

func (fo *FanOut[T, U]) Send(value T)

func (*FanOut[T, U]) SendChan added in v0.0.37

func (fo *FanOut[T, U]) SendChan() <-chan T

type FanOutCmd added in v0.0.21

type FanOutCmd[T any, U any] struct {
	Name           string
	Filter         FilterFunc[T]
	SelfOwned      bool
	AddedChannel   chan<- U
	RemovedChannel chan<- U
}

type FilterFunc added in v0.0.33

type FilterFunc[T any] func(T) bool

type Hub added in v0.0.30

type Hub[M any] struct {
	// contains filtered or unexported fields
}

func NewHub added in v0.0.30

func NewHub[M any](router Router[M]) *Hub[M]

func (*Hub[M]) Connect added in v0.0.30

func (h *Hub[M]) Connect(reader HubReader[M], writer HubWriter[M]) (*HubClient[M], error)

func (*Hub[M]) Send added in v0.0.30

func (s *Hub[M]) Send(message Message[M], callbackChan chan M) error

func (*Hub[M]) Stop added in v0.0.30

func (s *Hub[M]) Stop()

*

  • Stop the Hub and cleanup.

type HubClient added in v0.0.30

type HubClient[M any] struct {
	Write   HubWriter[M]
	Read    HubReader[M]
	OnClose func()
	// contains filtered or unexported fields
}

func (*HubClient[M]) Disconnect added in v0.0.30

func (h *HubClient[M]) Disconnect()

func (*HubClient[M]) GetId added in v0.0.30

func (h *HubClient[M]) GetId() string

func (*HubClient[M]) Subscribe added in v0.0.30

func (h *HubClient[M]) Subscribe(topicIds ...TopicIdType)

func (*HubClient[M]) Unsubscribe added in v0.0.30

func (h *HubClient[M]) Unsubscribe(topicIds ...TopicIdType)

type HubControlEvent added in v0.0.30

type HubControlEvent[M any] struct {
	Client               *HubClient[M]
	Quit                 bool
	Pause                bool
	AddedSubscriptions   []TopicIdType
	RemovedSubscriptions []TopicIdType
}

type HubMessage added in v0.0.30

type HubMessage[M any] struct {
	Message  Message[M]
	Callback chan M
}

type HubReader added in v0.0.37

type HubReader[M any] func() Message[M]

A function for reading from a hub

type HubWriter added in v0.0.30

type HubWriter[M any] func(Message[M]) error

A function that can write to a particular hub

type Input added in v0.0.37

type Input[T any] struct {
	// contains filtered or unexported fields
}

type KVRouter added in v0.0.30

type KVRouter[M any] struct {
	// contains filtered or unexported fields
}

*

  • A type of router that maintains an topicId -> Conn[] by
  • a fixed topics.

func NewKVRouter added in v0.0.30

func NewKVRouter[M any](msgToKey func(msg M) TopicIdType) *KVRouter[M]

func (*KVRouter[M]) Add added in v0.0.33

func (r *KVRouter[M]) Add(client *HubClient[M]) error

func (*KVRouter[M]) AddRoute added in v0.0.30

func (r *KVRouter[M]) AddRoute(client *HubClient[M], topicIds ...TopicIdType) error

func (*KVRouter[M]) Remove added in v0.0.30

func (r *KVRouter[M]) Remove(client *HubClient[M]) error

func (*KVRouter[M]) RemoveRoute added in v0.0.30

func (r *KVRouter[M]) RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error

func (*KVRouter[M]) RouteMessage added in v0.0.30

func (r *KVRouter[M]) RouteMessage(msg Message[M], source *HubClient[M]) error

type Message added in v0.0.37

type Message[T any] struct {
	Value  T
	Error  error
	Source interface{}
}

type Pipe

type Pipe[T any] struct {
	RunnerBase[string]

	OnDone func(p *Pipe[T])
	// contains filtered or unexported fields
}

func NewPipe added in v0.0.19

func NewPipe[T any](input <-chan T, output chan<- T) *Pipe[T]

type Reader added in v0.0.37

type Reader[R any] struct {
	RunnerBase[string]

	Read   ReaderFunc[R]
	OnDone func(r *Reader[R])
	// contains filtered or unexported fields
}

func NewReader

func NewReader[R any](read ReaderFunc[R]) *Reader[R]

func (*Reader[R]) RecvChan added in v0.0.37

func (rc *Reader[R]) RecvChan() <-chan Message[R]

*

  • Returns the conn's reader channel.

type ReaderFunc added in v0.0.37

type ReaderFunc[R any] func() (R, error)

type Router added in v0.0.30

type Router[M any] interface {
	// Called when a new client has joined the group
	Add(client *HubClient[M]) error

	// Called when a client has dropped off
	Remove(client *HubClient[M]) error

	AddRoute(client *HubClient[M], topicIds ...TopicIdType) error
	RemoveRoute(client *HubClient[M], topicIds ...TopicIdType) error
	RouteMessage(msg Message[M], source *HubClient[M]) error
}

*

  • In our hub the main problem is given a message, identifying
  • all clients it should be sent to. Instead of having a fixed
  • key -> HubClient[] mapping, it is easier to simply for each
  • message use a router delegate to identify all HubClients that
  • are candidates. This Router does that.

type RunnerBase added in v0.0.37

type RunnerBase[C any] struct {
	// contains filtered or unexported fields
}

func NewRunnerBase added in v0.0.37

func NewRunnerBase[C any](stopVal C) RunnerBase[C]

func (*RunnerBase[C]) IsRunning added in v0.0.37

func (r *RunnerBase[C]) IsRunning() bool

func (*RunnerBase[C]) Stop added in v0.0.37

func (r *RunnerBase[C]) Stop() error

*

  • This method is called to stop the runner. It is upto the child classes
  • to listen to messages on the control channel and initiate the wind-down
  • and cleanup process.

type TopicIdType added in v0.0.36

type TopicIdType interface{}

type Writer added in v0.0.37

type Writer[W any] struct {
	RunnerBase[string]

	Write WriterFunc[W]
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter[W any](write WriterFunc[W]) *Writer[W]

func (*Writer[W]) Send added in v0.0.37

func (wc *Writer[W]) Send(req W) bool

func (*Writer[W]) SendChan added in v0.0.37

func (wc *Writer[W]) SendChan() chan W

type WriterFunc added in v0.0.37

type WriterFunc[W any] func(W) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL