Documentation
¶
Overview ¶
The conc package contains a few utilities for basic concurrency patterns that have wide uses.
Index ¶
- func IDFunc[T any](input T) T
- type FanIn
- type FanOut
- func (fo *FanOut[T]) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
- func (fo *FanOut[T]) Count() int
- func (fo *FanOut[T]) DebugInfo() any
- func (fo *FanOut[T]) New(filter FilterFunc[T]) chan T
- func (fo *FanOut[T]) Remove(output chan<- T, wait bool) (callbackChan chan error)
- func (fo *FanOut[T]) Send(value T)
- func (fo *FanOut[T]) SendChan() <-chan T
- type FilterFunc
- type Map
- func (m *Map[K, V]) Delete(k K)
- func (m *Map[K, V]) Get(k K) (V, bool)
- func (m *Map[K, V]) Has(k K) bool
- func (m *Map[K, V]) LDelete(k K, lock bool)
- func (m *Map[K, V]) LGet(k K, lock bool) (V, bool)
- func (m *Map[K, V]) LHas(k K, lock bool) bool
- func (m *Map[K, V]) LRange(lock bool, meth func(K, V) bool)
- func (m *Map[K, V]) LSet(k K, v V, lock bool)
- func (m *Map[K, V]) Lock()
- func (m *Map[K, V]) RLock()
- func (m *Map[K, V]) RUnlock()
- func (m *Map[K, V]) Range(meth func(K, V) bool)
- func (m *Map[K, V]) Set(k K, v V)
- func (m *Map[K, V]) Unlock()
- func (m *Map[K, V]) Update(actions func(items map[K]V))
- func (m *Map[K, V]) View(actions func())
- type Mapper
- type Message
- type Reader
- type ReaderFunc
- type Reducer
- type RunnerBase
- type Writer
- type WriterFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type FanIn ¶
type FanIn[T any] struct { RunnerBase[fanInCmd[T]] // Called when a channel is removed so the caller can // perform other cleanups etc based on this OnChannelRemoved func(fi *FanIn[T], inchan <-chan T) // contains filtered or unexported fields }
Example ¶
// Create 5 input channels and send 5 numbers into them // the collector channel fanin := NewFanIn[int](nil) defer fanin.Stop() NUM_CHANS := 2 NUM_MSGS := 3 var inchans []chan int for i := 0; i < NUM_CHANS; i++ { inchan := make(chan int) inchans = append(inchans, inchan) fanin.Add(inchan) } for i := 0; i < NUM_CHANS; i++ { go func(inchan chan int) { // send some numbers into this fanin for j := 0; j < NUM_MSGS; j++ { inchan <- j } }(inchans[i]) } // collect the fanned values var vals []int for i := 0; i < NUM_CHANS*NUM_MSGS; i++ { val := <-fanin.RecvChan() vals = append(vals, val) } // sort and print them for testing sort.Ints(vals) for _, v := range vals { fmt.Println(v) }
Output: 0 0 1 1 2 2
type FanOut ¶
type FanOut[T any] struct { RunnerBase[fanOutCmd[T]] // In the default mode, the Send method simply writes to an input channel that is read // by the runner loop of this FanOut. As soon as an event is read, it by default sequentially // writes to all output channels. If the output channels are not being drained by the reader // goroutine (in 2 above) then the Send method will block. // In other words, if the reader goroutine is NOT running before the Send method is invoked // OR if the reader goroutine is blocked for some reason, then the Send method will block. // To prevent this set the async flag to true in the Send method to true so that writes to // the reader goroutines are themselves asynchronous and non blocking. // // By setting this flag to true, writes to th output channels will happen synchronously without // invoking a new goroutine. This will help reduce number of goroutines kicked off during dispatch // and is is an optimization if callers/owners of this FanOut want to exercise fine control over the // reader channels and goroutines. For example the caller might create buffered output channels so // writes are blocked, or the caller themselves may be running the readers in seperate goroutines // to prevent any blocking behavior. SendSync bool // contains filtered or unexported fields }
FanOut takes a message from one chanel, applies a mapper function and fans it out to N output channels.
The general pattern is to:
- Create a FanOut[T] with the NewFanOut method
- Start a reader goroutine that reads values from fanout channels (note this SHOULD be started by any values are sent on the input channel)
- Start sending values through the input channel via the Send method.
Example ¶
// Create a fanout wiht 5 output channels and see that // numbers sent into the output are read from all of these fanout := NewFanOut[int](nil) defer fanout.Stop() NUM_CHANS := 2 NUM_MSGS := 3 // Add some receiver channels var outchans []chan int for i := 0; i < NUM_CHANS; i++ { outchan := fanout.New(nil) outchans = append(outchans, outchan) } var vals []int for i := 0; i < NUM_MSGS; i++ { fanout.Send(i) } // wait till all fanouts have been collected for j := 0; j < NUM_MSGS; j++ { for i := 0; i < NUM_CHANS; i++ { val := <-outchans[i] vals = append(vals, val) } } // sort and print them for testing sort.Ints(vals) for _, v := range vals { fmt.Println(v) }
Output: 0 0 1 1 2 2
func NewFanOut ¶
Creates a new typed FanOut runner. Every FanOut needs an inputChan from which messages can be read to fan out to listening channels. Ths inputChan can be owned by this FanOut or can be provided by the caller. If the input channel is provided then it is not closed when the FanOut runner terminates (or is stoopped).
func (*FanOut[T]) Add ¶ added in v0.0.37
func (fo *FanOut[T]) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
Adds a new channel to which incoming messages will be fanned out to. These output channels can be either added by the caller or created by this runner. If the output channel was passed, then it wont be closed when this runner finishes (or is stopped). A filter function can also be passed on a per output channel basis that can either transform or filter messages specific to this output channel. For example filters can be used to check permissions for an incoming message wrt to an output channel.
Output channels are added to our list of listeners asynchronously. The wait parameter if set to true will return a channel that can be read from to ensure that this output channel registration is synchronous.
func (*FanOut[T]) Count ¶ added in v0.0.37
Returns the number of listening channels currently running.
func (*FanOut[T]) New ¶ added in v0.0.19
func (fo *FanOut[T]) New(filter FilterFunc[T]) chan T
Adds a new output channel with an optional filter function that will be managed by this runner.
func (*FanOut[T]) Remove ¶ added in v0.0.19
Removes an output channel from our list of listeners. If the channel was managed/owned by this runner then it will also be closed. Just like the Add method, Removals are asynchronous. This can be made synchronized by passing wait=true.
type FilterFunc ¶ added in v0.0.33
type FilterFunc[T any] func(*T) *T
FanOuts lets a message to be fanned-out to multiple channels. Optionally the message can also be transformed (or filtered) before fanning out to the listeners.
type Map ¶ added in v0.0.61
type Map[K comparable, V any] struct { // contains filtered or unexported fields }
A synchronized map with read and update lock capabilities
func NewMap ¶ added in v0.0.61
func NewMap[K comparable, V any]() (out *Map[K, V])
Creates a new lockable map
func (*Map[K, V]) Delete ¶ added in v0.0.61
func (m *Map[K, V]) Delete(k K)
Locks the map to delete a given key.
func (*Map[K, V]) LDelete ¶ added in v0.0.93
Deletes the value by a given key. Optionally obtains a write lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) LGet ¶ added in v0.0.93
Gets the value by a given key. Optionally obtains a read lock if requested. A lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) LHas ¶ added in v0.0.93
Check if the map contains an entry by key. Optionally obtains a read lock if requested. A lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) LRange ¶ added in v0.0.93
Iterates over the items in this map. Optionally obtains a read lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) LSet ¶ added in v0.0.93
Sets the value by a given key. Optionally obtains a write lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) Lock ¶ added in v0.0.61
func (m *Map[K, V]) Lock()
Obtains a write lock on the map
func (*Map[K, V]) RLock ¶ added in v0.0.61
func (m *Map[K, V]) RLock()
Obtains a read lock on the map
func (*Map[K, V]) RUnlock ¶ added in v0.0.61
func (m *Map[K, V]) RUnlock()
Relinquishes a read lock on the map
func (*Map[K, V]) Set ¶ added in v0.0.61
func (m *Map[K, V]) Set(k K, v V)
Locks the map to set the value of a given key.
func (*Map[K, V]) Unlock ¶ added in v0.0.61
func (m *Map[K, V]) Unlock()
Relinquishes a write lock on the map
type Mapper ¶ added in v0.0.99
type Mapper[I any, O any] struct { RunnerBase[string] // MapFunc is applied to each value in the input channel // and returns a tuple of 3 things - outval, skip, stop // if skip is false, outval is sent to the output channel // if stop is true, then the entire mapper stops processing any further elements. // This mechanism can be used inaddition to the Stop method if sequencing this // within the elements of input channel is required MapFunc func(I) (O, bool, bool) OnDone func(p *Mapper[I, O]) // contains filtered or unexported fields }
Mappers connect an input and output channel applying transforms between them
func NewMapper ¶ added in v0.0.99
func NewMapper[T any, U any](input <-chan T, output chan<- U, mapper func(T) (U, bool, bool)) *Mapper[T, U]
Creates a new mapper between an input and output channel. The ownership of the channels is by the caller and not the Mapper. Hence they will nto be called when the mapper stops.
type Reader ¶ added in v0.0.37
type Reader[R any] struct { RunnerBase[string] Read ReaderFunc[R] OnDone func(r *Reader[R]) // contains filtered or unexported fields }
The typed Reader goroutine which calls a Read method to return data over a channel.
func NewReader ¶
func NewReader[R any](read ReaderFunc[R]) *Reader[R]
Creates a new reader instance. Just like time.Ticker, this initializer also starts the Reader loop. It is upto the caller to Stop this reader when done with. Not doing so can risk the reader to run indefinitely.
type ReaderFunc ¶ added in v0.0.37
Type of the reader method used by the Reader goroutine primitive.
type Reducer ¶ added in v0.0.98
type Reducer[T any, U any] struct { FlushPeriod time.Duration ReduceFunc func(inputs []T) (outputs U) // contains filtered or unexported fields }
Reducer is a way to collect messages of type T in some kind of window and reduce them to type U. For example this could be used to batch messages into a list every 10 seconds. Alternatively if a time based window is not used a reduction can be invokved manually.
func NewIDReducer ¶ added in v0.0.98
A Reducer that simply collects events of type T into a list (of type []T)
func NewReducer ¶ added in v0.0.98
The reducer over generic input and output types. The input channel can be provided on which the reducer will read messages. If an input channel is not provided then the reducer will create one (and own its lifecycle). Just like other runners, the Reducer starts as soon as it is created.
func (*Reducer[T, U]) Send ¶ added in v0.0.98
func (fo *Reducer[T, U]) Send(value T)
Send a mesasge/value onto this reducer for (eventual) reduction.
type RunnerBase ¶ added in v0.0.37
type RunnerBase[C any] struct { // contains filtered or unexported fields }
Base of the Reader and Writer primitives
func NewRunnerBase ¶ added in v0.0.37
func NewRunnerBase[C any](stopVal C) RunnerBase[C]
Creates a new base runner - called by the Reader and Writer primitives
func (*RunnerBase[R]) DebugInfo ¶ added in v0.0.80
func (r *RunnerBase[R]) DebugInfo() any
Used for returning any debug information.
func (*RunnerBase[C]) IsRunning ¶ added in v0.0.37
func (r *RunnerBase[C]) IsRunning() bool
Returns true if currently running otherwise false
func (*RunnerBase[C]) Stop ¶ added in v0.0.37
func (r *RunnerBase[C]) Stop() error
This method is called to stop the runner. It is upto the child classes to listen to messages on the control channel and initiate the wind-down and cleanup process.
type Writer ¶ added in v0.0.37
type Writer[W any] struct { RunnerBase[string] Write WriterFunc[W] // contains filtered or unexported fields }
The typed Writer goroutine type which calls the Write method when it serializes it writes.
func NewWriter ¶
func NewWriter[W any](write WriterFunc[W]) *Writer[W]
Creates a new writer instance. Just like time.Ticker, this initializer also starts the Writer loop. It is upto the caller to Stop this writer when done with. Not doing so can risk the writer to run indefinitely.
type WriterFunc ¶ added in v0.0.37
Type of the writer method used by the writer goroutine primitive to serialize its writes.