conc

package
v0.0.31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IDFunc added in v0.0.19

func IDFunc[T any](input T) T

Types

type Batcher added in v0.0.20

type Batcher[T any, U any] struct {
	FlushPeriod time.Duration
	Joiner      func(inputs []T) (outputs U)
	// contains filtered or unexported fields
}

func NewBatcher added in v0.0.20

func NewBatcher[T any, U any](inputChan chan T) *Batcher[T, U]

func NewIDBatcher added in v0.0.20

func NewIDBatcher[T any](inputChan chan T) *Batcher[T, []T]

func (*Batcher[T, U]) InputChannel added in v0.0.20

func (fo *Batcher[T, U]) InputChannel() chan<- T

func (*Batcher[T, U]) New added in v0.0.20

func (fo *Batcher[T, U]) New() chan U

func (*Batcher[T, U]) Remove added in v0.0.20

func (fo *Batcher[T, U]) Remove(output chan U)

func (*Batcher[T, U]) Send added in v0.0.20

func (fo *Batcher[T, U]) Send(value T)

func (*Batcher[T, U]) Stop added in v0.0.20

func (fo *Batcher[T, U]) Stop()

type BatcherCmd added in v0.0.21

type BatcherCmd[T any] struct {
	Name    string
	Channel chan T
}

type FanIn

type FanIn[T any] struct {
	// contains filtered or unexported fields
}

func NewFanIn added in v0.0.19

func NewFanIn[T any](outChan chan T) *FanIn[T]

func (*FanIn[T]) Add added in v0.0.19

func (fi *FanIn[T]) Add(inputs ...<-chan T)

func (*FanIn[T]) Channel added in v0.0.19

func (fi *FanIn[T]) Channel() chan T

func (*FanIn[T]) IsRunning added in v0.0.20

func (fi *FanIn[T]) IsRunning() bool

func (*FanIn[T]) Remove added in v0.0.19

func (fi *FanIn[T]) Remove(target <-chan T)

func (*FanIn[T]) Stop added in v0.0.19

func (fi *FanIn[T]) Stop()

type FanInCmd added in v0.0.21

type FanInCmd[T any] struct {
	Name           string
	AddedChannel   <-chan T
	RemovedChannel <-chan T
}

type FanOut

type FanOut[T any, U any] struct {
	Mapper func(inputs T) U
	// contains filtered or unexported fields
}

*

  • FanOut takes a message from one chanel, applies a mapper function
  • and fans it out to N output channels.

func NewFanOut

func NewFanOut[T any, U any](inputChan chan T, mapper func(T) U) *FanOut[T, U]

func NewIDFanOut added in v0.0.20

func NewIDFanOut[T any](input chan T, mapper func(T) T) *FanOut[T, T]

*

  • Creates a new IDFanout bridge.

func (*FanOut[T, U]) InputChannel added in v0.0.19

func (fo *FanOut[T, U]) InputChannel() chan<- T

func (*FanOut[T, U]) IsRunning added in v0.0.20

func (fo *FanOut[T, U]) IsRunning() bool

func (*FanOut[T, U]) New added in v0.0.19

func (fo *FanOut[T, U]) New() <-chan U

func (*FanOut[T, U]) Remove added in v0.0.19

func (fo *FanOut[T, U]) Remove(output <-chan U)

func (*FanOut[T, U]) Send

func (fo *FanOut[T, U]) Send(value T)

func (*FanOut[T, U]) Stop added in v0.0.19

func (fo *FanOut[T, U]) Stop()

type FanOutCmd added in v0.0.21

type FanOutCmd[T any] struct {
	Name           string
	AddedChannel   chan T
	RemovedChannel <-chan T
}

type Hub added in v0.0.30

type Hub[M any] struct {
	// contains filtered or unexported fields
}

func NewHub added in v0.0.30

func NewHub[M any](router Router[M]) *Hub[M]

func (*Hub[M]) Connect added in v0.0.30

func (h *Hub[M]) Connect(writer HubWriter[M]) *HubClient[M]

func (*Hub[M]) Send added in v0.0.30

func (s *Hub[M]) Send(eventKey string, message M, callbackChan chan M) error

func (*Hub[M]) Stop added in v0.0.30

func (s *Hub[M]) Stop()

*

  • Stop the Hub and cleanup.

type HubClient added in v0.0.30

type HubClient[M any] struct {
	WriteMessage func(HubMessage[M]) error
	// contains filtered or unexported fields
}

func (*HubClient[M]) Disconnect added in v0.0.30

func (h *HubClient[M]) Disconnect()

func (*HubClient[M]) GetId added in v0.0.30

func (h *HubClient[M]) GetId() string

func (*HubClient[M]) Subscribe added in v0.0.30

func (h *HubClient[M]) Subscribe(eventKeys ...string)

func (*HubClient[M]) Unsubscribe added in v0.0.30

func (h *HubClient[M]) Unsubscribe(eventKeys ...string)

type HubControlEvent added in v0.0.30

type HubControlEvent[M any] struct {
	Client               *HubClient[M]
	Quit                 bool
	Pause                bool
	AddedSubscriptions   []string
	RemovedSubscriptions []string
}

type HubMessage added in v0.0.30

type HubMessage[M any] struct {
	EventKey string
	Message  M
	Callback chan M
}

type HubWriter added in v0.0.30

type HubWriter[M any] func(HubMessage[M]) error

type KVRouter added in v0.0.30

type KVRouter[M any] struct {
	// contains filtered or unexported fields
}

*

  • A type of router that maintains an eventKey -> Conn[] by
  • a fixed event keys.

func NewKVRouter added in v0.0.30

func NewKVRouter[M any]() *KVRouter[M]

func (*KVRouter[M]) AddRoute added in v0.0.30

func (r *KVRouter[M]) AddRoute(client *HubClient[M], eventKeys ...string) error

func (*KVRouter[M]) Remove added in v0.0.30

func (r *KVRouter[M]) Remove(client *HubClient[M]) error

func (*KVRouter[M]) RemoveRoute added in v0.0.30

func (r *KVRouter[M]) RemoveRoute(client *HubClient[M], eventKeys ...string) error

func (*KVRouter[M]) RouteMessage added in v0.0.30

func (r *KVRouter[M]) RouteMessage(msg HubMessage[M]) error

type Pipe

type Pipe[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewIDPipe added in v0.0.20

func NewIDPipe[T any](input <-chan T, output chan<- T, mapper func(T) T) *Pipe[T, T]

func NewPipe added in v0.0.19

func NewPipe[T any, U any](input <-chan T, output chan<- U, mapper func(T) U) *Pipe[T, U]

func (*Pipe[T, U]) Stop added in v0.0.19

func (p *Pipe[T, U]) Stop()

type ReaderChan

type ReaderChan[R any, D any] struct {
	ReaderWriterBase[R, D]

	Read func() (R, error)
	// contains filtered or unexported fields
}

func NewReader

func NewReader[R any, D any](read func() (R, error)) *ReaderChan[R, D]

func (*ReaderChan[R, D]) IsRunning

func (ch *ReaderChan[R, D]) IsRunning() bool

*

  • Returns whether the connection reader/writer loops are running.

func (*ReaderChan[R, D]) ResultChannel added in v0.0.19

func (rc *ReaderChan[R, D]) ResultChannel() chan ValueOrError[R]

*

  • Returns the conn's reader channel.

func (*ReaderChan[T, D]) Stop

func (ch *ReaderChan[T, D]) Stop() error

*

  • This method is called to stop the channel
  • If already connected then nothing is done and nil
  • is not already connected, a connection will first be established
  • (including auth and refreshing tokens) and then the reader and
  • writers are started. SendRequest can be called to send requests
  • to the peer and the (user provided) msgChannel will be used to
  • handle messages from the server.

type ReaderWriterBase

type ReaderWriterBase[T any, D any] struct {
	Info D
	// Time allowed to read the next pong message from the peer.
	WaitTime time.Duration
	// contains filtered or unexported fields
}

func (*ReaderWriterBase[T, D]) WaitForFinish

func (rwb *ReaderWriterBase[T, D]) WaitForFinish()

*

  • Waits until the socket connection is disconnected or manually stopped.

type Router added in v0.0.30

type Router[M any] interface {
	AddRoute(client *HubClient[M], eventKeys ...string) error
	RemoveRoute(client *HubClient[M], eventKeys ...string) error
	Remove(client *HubClient[M]) error
	RouteMessage(HubMessage[M]) error
}

*

  • In our streamer the main problem is given a message, identifying
  • all connections it should be sent to. Instead of having a fixed
  • key -> HubClient[] mapping, it is easier to simply for each
  • message use a router delegate to identify all HubClients that
  • are candidates. This Router does that.

type ValueOrError

type ValueOrError[T any] struct {
	Value T
	Error error
}

type WriterChan

type WriterChan[W any, D any] struct {
	ReaderWriterBase[W, D]

	Write   func(msg W) error
	OnClose func()
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter[W any, D any](write func(msg W) error) *WriterChan[W, D]

func (*WriterChan[W, D]) IsRunning

func (ch *WriterChan[W, D]) IsRunning() bool

*

  • Returns whether the connection reader/writer loops are running.

func (*WriterChan[W, D]) Send

func (wc *WriterChan[W, D]) Send(req W) bool

func (*WriterChan[T, D]) Stop

func (ch *WriterChan[T, D]) Stop() error

*

  • This method is called to stop the channel
  • If already connected then nothing is done and nil
  • is not already connected, a connection will first be established
  • (including auth and refreshing tokens) and then the reader and
  • writers are started. SendRequest can be called to send requests
  • to the peer and the (user provided) msgChannel will be used to
  • handle messages from the server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL