Documentation ¶
Overview ¶
Package reactive implements abstractions for connecting generator functions to observers.
Index ¶
- func SetLogger(newLogger func(Level, interface{}, string, ...interface{}))
- type CancellableSource
- func FromGenerator[T any](generator func() (*T, error)) CancellableSource[T]
- func FromGeneratorWithDefaultBackoff[T any](generator func() (*T, error)) CancellableSource[T]
- func FromGeneratorWithExponentialBackoff[T any](generator func() (*T, error), maxBackoff float64, backoffMultiplier float64) CancellableSource[T]
- type GeneratorFinished
- type Level
- type Source
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetLogger ¶ added in v0.3.0
SetLogger sets the logger for all logging in the reactive package. Default logger implementation:
func(level Level, source interface{}, formatString string, args ...interface{}) { if level < Warning { return } message := fmt.Sprintf(formatString, args...) // delay message formatting till level can be evaluated log.Printf("%s [%s]: %s\n", level, source, message) }
Note that the formatting string should not be evaluated until after filtering by log Level. Formatting Verbose and Debug logs can create superfluous cpu load.
Types ¶
type CancellableSource ¶ added in v0.3.0
type CancellableSource[T any] interface { Source[T] // Cancel stops the propagation of items from this source. If the [CancellableSource] is based on a generator, the // generator is no longer polled. Cancel() error }
CancellableSource is a Source that can be canceled. or a literal (Just).
func FromGenerator ¶
func FromGenerator[T any](generator func() (*T, error)) CancellableSource[T]
FromGenerator returns a Source from the provided generator function. The generator provided will be polled for items:
- The generator should return (*T, nil) when an item is available.
- If no item is available, the generator should return (nil, nil).
- If the generator is complete it should return (*T, GeneratorFinished)
- Finally, errors may be returned via (nil, error).
Note that this source can be cancelled via [CancellableSource.Cancel].
func FromGeneratorWithDefaultBackoff ¶ added in v0.3.0
func FromGeneratorWithDefaultBackoff[T any](generator func() (*T, error)) CancellableSource[T]
FromGeneratorWithDefaultBackoff is similar to FromGenerator, but waits at least 250ms and at most 10s
func FromGeneratorWithExponentialBackoff ¶
func FromGeneratorWithExponentialBackoff[T any]( generator func() (*T, error), maxBackoff float64, backoffMultiplier float64, ) CancellableSource[T]
FromGeneratorWithExponentialBackoff is similar to FromGenerator, but accepts parameters for implementing a exponential backoff to prevent rapid polling.
- maxBackoff maximum time to wait in milliseconds
- backoffMultiplier the multiplier m in m*2^e where e is the error count
type GeneratorFinished ¶ added in v0.3.0
type GeneratorFinished struct{} //todo: should generators just return three things?
GeneratorFinished should be returned from a generator function when the generator completes successfully, but no further items are expected.
func (*GeneratorFinished) Error ¶ added in v0.4.0
func (g *GeneratorFinished) Error() string
Error implements the error interface
type Level ¶ added in v0.3.0
type Level int
Level represents a log level ranging from Verbose to Error.
- Verbose logging indicates intent and logs every item handled.
- Debug logging indicates state changes and logs internal fields.
- Info logging happensf for once per Source status changes and handled errors.
- Warning logging occurs when a function returns an error resulting in a message being discarded.
- Error logging occurs when a source shuts down unexpectedly without cleaning up.
The default logger logs Warning and Error level. Use SetLogger to change this behavior.
const ( // Verbose logging indicates intent and logs every item handled. For example, logging the intent to send an item to a sink Verbose Level = iota // Debug logging indicates state changes and logs internal fields. For example a generator's error count increasing or an observer (Source.Observe) being registered. Debug // Info logging occurs for once per Source status changes and handled errors. For example a source successfully shuts down, or an error is encountered and handled Info // Warning logging occurs when a function returns an error resulting in a message being discarded. For example when a sink returns an error. Warning // Error logging occurs when a sink, shutdown hook or mapper panics. Error )
type Source ¶
type Source[T any] interface { // UponClose registers a hook to run then this source shuts down. // All registered functions will complete before AwaitCompletion unblocks. UponClose(func()) // Observe registers a sink that will observe each item that flows through this source. // Each sink will be called in its own go routine. Observe(func(T) error) // Start begins pumping items through the source. // Generators start polling, channels start listening, literals start pumping. // // This method can be called multiple times, only the first has any effect (locking via sync.OnceFunc). Start() // AwaitCompletion blocks until the source is closed and all UponClose hooks are complete. AwaitCompletion() }
Source is a producer of items. A Source can be based on a generator function (FromGenerator, FromGeneratorWithExponentialBackoff), a channel (FromChan) or a literal (Just).
func Buffer ¶
Buffer observes one Source, and returns a Source backed by a circular buffer with the requested size. This is implemented via a channel.
The returned Source is already started.