reactive

package
v0.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2023 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package reactive implements abstractions for connecting generator functions to observers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetLogger added in v0.3.0

func SetLogger(newLogger func(Level, interface{}, string, ...interface{}))

SetLogger sets the logger for all logging in the reactive package. Default logger implementation:

func(level Level, source interface{}, formatString string, args ...interface{}) {
  if level < Warning {
    return
  }
  message := fmt.Sprintf(formatString, args...) // delay message formatting till level can be evaluated
  log.Printf("%s [%s]: %s\n", level, source, message)
}

Note that the formatting string should not be evaluated until after filtering by log Level. Formatting Verbose and Debug logs can create superfluous cpu load.

Types

type CancellableSource added in v0.3.0

type CancellableSource[T any] interface {
	Source[T]
	// Cancel stops the propagation of items from this source. If the [CancellableSource] is based on a generator, the
	// generator is no longer polled.
	Cancel() error
}

CancellableSource is a Source that can be canceled. or a literal (Just).

func FromGenerator

func FromGenerator[T any](generator func() (*T, error)) CancellableSource[T]

FromGenerator returns a Source from the provided generator function. The generator provided will be polled for items:

  • The generator should return (*T, nil) when an item is available.
  • If no item is available, the generator should return (nil, nil).
  • If the generator is complete it should return (*T, GeneratorFinished)
  • Finally, errors may be returned via (nil, error).

Note that this source can be cancelled via [CancellableSource.Cancel].

func FromGeneratorWithDefaultBackoff added in v0.3.0

func FromGeneratorWithDefaultBackoff[T any](generator func() (*T, error)) CancellableSource[T]

FromGeneratorWithDefaultBackoff is similar to FromGenerator, but waits at least 250ms and at most 10s

func FromGeneratorWithExponentialBackoff

func FromGeneratorWithExponentialBackoff[T any](
	generator func() (*T, error),
	maxBackoff float64,
	backoffMultiplier float64,
) CancellableSource[T]

FromGeneratorWithExponentialBackoff is similar to FromGenerator, but accepts parameters for implementing a exponential backoff to prevent rapid polling.

  • maxBackoff maximum time to wait in milliseconds
  • backoffMultiplier the multiplier m in m*2^e where e is the error count

type GeneratorFinished added in v0.3.0

type GeneratorFinished struct{} //todo: should generators just return three things?

GeneratorFinished should be returned from a generator function when the generator completes successfully, but no further items are expected.

func (*GeneratorFinished) Error added in v0.4.0

func (g *GeneratorFinished) Error() string

Error implements the error interface

type Level added in v0.3.0

type Level int

Level represents a log level ranging from Verbose to Error.

  • Verbose logging indicates intent and logs every item handled.
  • Debug logging indicates state changes and logs internal fields.
  • Info logging happensf for once per Source status changes and handled errors.
  • Warning logging occurs when a function returns an error resulting in a message being discarded.
  • Error logging occurs when a source shuts down unexpectedly without cleaning up.

The default logger logs Warning and Error level. Use SetLogger to change this behavior.

const (
	// Verbose logging indicates intent and logs every item handled. For example, logging the intent to send an item to a sink
	Verbose Level = iota
	// Debug logging indicates state changes and logs internal fields. For example a generator's error count increasing or an observer (Source.Observe) being registered.
	Debug
	// Info logging occurs for once per Source status changes and handled errors. For example a source successfully shuts down, or an error is encountered and handled
	Info
	// Warning logging occurs when a function returns an error resulting in a message being discarded. For example when a sink returns an error.
	Warning
	// Error logging occurs when a sink, shutdown hook or mapper panics.
	Error
)

func (Level) String added in v0.4.0

func (l Level) String() string

String() returns the name of the log Level right padded with spaces to seven characters.

type Source

type Source[T any] interface {
	// UponClose registers a hook to run then this source shuts down.
	// All registered functions will complete before AwaitCompletion unblocks.
	UponClose(func())
	// Observe registers a sink that will observe each item that flows through this source.
	// Each sink will be called in its own go routine.
	Observe(func(T) error)
	// Start begins pumping items through the source.
	// Generators start polling, channels start listening, literals start pumping.
	//
	// This method can be called multiple times, only the first has any effect (locking via sync.OnceFunc).
	Start()
	// AwaitCompletion blocks until the source is closed and all UponClose hooks are complete.
	AwaitCompletion()
}

Source is a producer of items. A Source can be based on a generator function (FromGenerator, FromGeneratorWithExponentialBackoff), a channel (FromChan) or a literal (Just).

func Buffer

func Buffer[T any](source Source[T], size int) Source[T]

Buffer observes one Source, and returns a Source backed by a circular buffer with the requested size. This is implemented via a channel.

The returned Source is already started.

func FromChan

func FromChan[T any](channel chan T) Source[T]

FromChan returns a Source from the provided channel.

func FromSlice

func FromSlice[T any](data []T) Source[T]

FromSlice returns a Source from the provided slice of items.

func Just

func Just[T any](data ...T) Source[T]

Just returns a Source from the provided items.

func Map

func Map[T any, V any](source Source[T], mapper func(T) (V, error)) Source[V]

Map observes one Source, transform the items observed with the provided mapper function, and returns a Source of the transformed items. If the mapper returns an error the item dropped, it is not retried.

The returned Source is already started.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL