conc

package
v0.0.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IDFunc added in v0.0.19

func IDFunc[T any](input T) T

Types

type Batcher added in v0.0.20

type Batcher[T any, U any] struct {
	FlushPeriod time.Duration
	Joiner      func(inputs []T) (outputs U)
	// contains filtered or unexported fields
}

func NewBatcher added in v0.0.20

func NewBatcher[T any, U any](inputChan chan T) *Batcher[T, U]

func NewIDBatcher added in v0.0.20

func NewIDBatcher[T any](inputChan chan T) *Batcher[T, []T]

func (*Batcher[T, U]) InputChannel added in v0.0.20

func (fo *Batcher[T, U]) InputChannel() chan<- T

func (*Batcher[T, U]) New added in v0.0.20

func (fo *Batcher[T, U]) New() chan U

func (*Batcher[T, U]) Remove added in v0.0.20

func (fo *Batcher[T, U]) Remove(output chan U)

func (*Batcher[T, U]) Send added in v0.0.20

func (fo *Batcher[T, U]) Send(value T)

func (*Batcher[T, U]) Stop added in v0.0.20

func (fo *Batcher[T, U]) Stop()

type BatcherCmd added in v0.0.21

type BatcherCmd[T any] struct {
	Name    string
	Channel chan T
}

type FanIn

type FanIn[T any] struct {
	// contains filtered or unexported fields
}

func NewFanIn added in v0.0.19

func NewFanIn[T any](outChan chan T) *FanIn[T]

func (*FanIn[T]) Add added in v0.0.19

func (fi *FanIn[T]) Add(inputs ...<-chan T)

func (*FanIn[T]) Channel added in v0.0.19

func (fi *FanIn[T]) Channel() chan T

func (*FanIn[T]) IsRunning added in v0.0.20

func (fi *FanIn[T]) IsRunning() bool

func (*FanIn[T]) Remove added in v0.0.19

func (fi *FanIn[T]) Remove(target <-chan T)

func (*FanIn[T]) Stop added in v0.0.19

func (fi *FanIn[T]) Stop()

type FanInCmd added in v0.0.21

type FanInCmd[T any] struct {
	Name           string
	AddedChannel   <-chan T
	RemovedChannel <-chan T
}

type FanOut

type FanOut[T any, U any] struct {
	Mapper func(inputs T) U
	// contains filtered or unexported fields
}

func NewFanOut

func NewFanOut[T any, U any](inputChan chan T, mapper func(T) U) *FanOut[T, U]

func NewIDFanOut added in v0.0.20

func NewIDFanOut[T any](input chan T, mapper func(T) T) *FanOut[T, T]

func (*FanOut[T, U]) InputChannel added in v0.0.19

func (fo *FanOut[T, U]) InputChannel() chan<- T

func (*FanOut[T, U]) IsRunning added in v0.0.20

func (fo *FanOut[T, U]) IsRunning() bool

func (*FanOut[T, U]) New added in v0.0.19

func (fo *FanOut[T, U]) New() <-chan U

func (*FanOut[T, U]) Remove added in v0.0.19

func (fo *FanOut[T, U]) Remove(output <-chan U)

func (*FanOut[T, U]) Send

func (fo *FanOut[T, U]) Send(value T)

func (*FanOut[T, U]) Stop added in v0.0.19

func (fo *FanOut[T, U]) Stop()

type FanOutCmd added in v0.0.21

type FanOutCmd[T any] struct {
	Name           string
	AddedChannel   chan T
	RemovedChannel <-chan T
}

type Pipe

type Pipe[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewIDPipe added in v0.0.20

func NewIDPipe[T any](input <-chan T, output chan<- T, mapper func(T) T) *Pipe[T, T]

func NewPipe added in v0.0.19

func NewPipe[T any, U any](input <-chan T, output chan<- U, mapper func(T) U) *Pipe[T, U]

func (*Pipe[T, U]) Stop added in v0.0.19

func (p *Pipe[T, U]) Stop()

type ReaderChan

type ReaderChan[R any, D any] struct {
	ReaderWriterBase[R, D]

	Read func() (R, error)
	// contains filtered or unexported fields
}

func NewReader

func NewReader[R any, D any](read func() (R, error)) *ReaderChan[R, D]

func (*ReaderChan[R, D]) IsRunning

func (ch *ReaderChan[R, D]) IsRunning() bool

*

  • Returns whether the connection reader/writer loops are running.

func (*ReaderChan[R, D]) ResultChannel added in v0.0.19

func (rc *ReaderChan[R, D]) ResultChannel() chan ValueOrError[R]

*

  • Returns the conn's reader channel.

func (*ReaderChan[T, D]) Stop

func (ch *ReaderChan[T, D]) Stop() error

*

  • This method is called to stop the channel
  • If already connected then nothing is done and nil
  • is not already connected, a connection will first be established
  • (including auth and refreshing tokens) and then the reader and
  • writers are started. SendRequest can be called to send requests
  • to the peer and the (user provided) msgChannel will be used to
  • handle messages from the server.

type ReaderWriterBase

type ReaderWriterBase[T any, D any] struct {
	Info D
	// Time allowed to read the next pong message from the peer.
	WaitTime time.Duration
	// contains filtered or unexported fields
}

func (*ReaderWriterBase[T, D]) WaitForFinish

func (rwb *ReaderWriterBase[T, D]) WaitForFinish()

*

  • Waits until the socket connection is disconnected or manually stopped.

type ValueOrError

type ValueOrError[T any] struct {
	Value T
	Error error
}

type WriterChan

type WriterChan[W any, D any] struct {
	ReaderWriterBase[W, D]

	Write   func(msg W) error
	OnClose func()
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter[W any, D any](write func(msg W) error) *WriterChan[W, D]

func (*WriterChan[W, D]) IsRunning

func (ch *WriterChan[W, D]) IsRunning() bool

*

  • Returns whether the connection reader/writer loops are running.

func (*WriterChan[W, D]) Send

func (wc *WriterChan[W, D]) Send(req W) bool

func (*WriterChan[T, D]) Stop

func (ch *WriterChan[T, D]) Stop() error

*

  • This method is called to stop the channel
  • If already connected then nothing is done and nil
  • is not already connected, a connection will first be established
  • (including auth and refreshing tokens) and then the reader and
  • writers are started. SendRequest can be called to send requests
  • to the peer and the (user provided) msgChannel will be used to
  • handle messages from the server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL