concurrent

package
v0.2024.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2024 License: GPL-3.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BreakBackpressure

func BreakBackpressure[T any](in <-chan T) <-chan T

BreakBackpressure will wrap a channel to break backpressure between its input and output. It will drop incoming messages when the consumer can't process them fast enough. When the consumer receives, it will always get the most recent message sent by the producer.

func BreakBackpressureBuffered

func BreakBackpressureBuffered[T any](in <-chan T, size int) <-chan T

Types

type Bus

type Bus[T any] struct {
	// contains filtered or unexported fields
}

func (*Bus[T]) Listen

func (b *Bus[T]) Listen(ctx context.Context, backpressure bool, bufferSize int) <-chan T

Listen subscribes to messages sent over the bus. Listen has two modes of operation: with backpressure, it will send all events directly over the returned channel. If the receiver blocks, then this will exert backpressure onto the senders, which will also block. In this mode, bufferSize controls the buffer size of the channel, with values of 0 or more permissible. Without backpressure, the receiver will never cause the sender to block. Instead, the bus will buffer the bufferSize most recent messages sent over the bus. The buffer size must be 1 or more. If the receiver does not receive often enough, messages will be silently discarded.

func (*Bus[T]) Send

func (b *Bus[T]) Send(ctx context.Context, event T) (ok bool)

type Map

type Map[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewMap

func NewMap[K comparable, V any]() *Map[K, V]

func (*Map[K, V]) CloneAndEvents

func (m *Map[K, V]) CloneAndEvents(ctx context.Context, backpressure bool, bufferSize int) (map[K]V, <-chan MapEvent[K, V])

func (*Map[K, V]) Copy

func (m *Map[K, V]) Copy() map[K]V

Copy creates a shallow copy of the map as a native Go map.

func (*Map[K, V]) Delete

func (m *Map[K, V]) Delete(ctx context.Context, key K) (old V, present bool, sent bool)

func (*Map[K, V]) Events

func (m *Map[K, V]) Events(ctx context.Context, backpressure bool, bufferSize int) <-chan MapEvent[K, V]

func (*Map[K, V]) Get

func (m *Map[K, V]) Get(key K) (value V, present bool)

Get will retrieve the value corresponding to the given key. If the key is not present, returns the zero value

func (*Map[K, V]) GetOrStore

func (m *Map[K, V]) GetOrStore(ctx context.Context, key K, store V) (value V, present bool, sent bool)

GetOrStore will retrieve an entry in the map if it exists, otherwise creating it with the provided value.

func (*Map[K, V]) Store

func (m *Map[K, V]) Store(ctx context.Context, key K, value V) (old V, present bool, sent bool)

Store will store the value into the map under the given key. If there is an existing entry for the key, then the old value will be replaced and returned. If sending changes to all listeners succeeded before the context was cancelled, returns sent=true. Otherwise, sent=false and some listeners may not have received the notifications. The stored values, as retrieved by Get, will always be modified.

type MapEvent

type MapEvent[K comparable, V any] struct {
	Type      MapEventType
	Key       K
	OldValue  V
	NewValue  V
	Timestamp time.Time
}

type MapEventType

type MapEventType int
const (
	MapEventInsert MapEventType = iota + 1
	MapEventReplace
	MapEventDelete
)

type Value

type Value[T any] struct {
	// contains filtered or unexported fields
}

func NewValue

func NewValue[T any](initial T) *Value[T]

func (*Value[T]) Changes

func (v *Value[T]) Changes(ctx context.Context, backpressure bool, bufferSize int) (value T, changes <-chan ValueEvent[T])

func (*Value[T]) Get

func (v *Value[T]) Get() (value T, modified time.Time)

func (*Value[T]) Set

func (v *Value[T]) Set(ctx context.Context, value T) (old T, ok bool)

type ValueEvent

type ValueEvent[T any] struct {
	Timestamp time.Time
	Old       T
	New       T
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL