stream

package
v0.0.0-...-3c148fd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2025 License: MIT, Unlicense Imports: 13 Imported by: 4

Documentation

Overview

Package stream provides the ability to safely read asynchronous, dynamic application state from Gio layout code.

It does so by providing several constructs:

  • Controllers, which connect the lifecycle of asynchronously generated data streams to the frame lifecycle of a Gio application window. Controllers handle invalidating the window when new data is available on active streams and also shut down streams when they are no longer in use.
  • Streams, which are bound to a particular controller and provide asynchronous state updates while they are in use by visible widgets. When not in use, their state updates shut down to conserve resources.
  • Transformations are functions that can operate on stream channels to easily create data streams from disparate sources of asynchronous information.
  • Inputs provide data from the application GUI that can be safely read within a stream's asynchronous processing.

Additionally, stream provides constructs for supervising persistent or stateful asynchronous operations from your UI:

  • Mutators, which maintain a list of running stateful operations and ensure that they all shut down as cleanly as possible before your application exits.
  • MutationPools, which allow you to manage groups of related stateful operations and prevent duplicate operations from being created at the same time.
  • Mutations, which are a (streamable) handle onto a running stateful operation.

Controllers

Each window using streams needs its own Controller, which can be constructed with NewController. The controller is used to construct streams bound to that window, and has a method (Controller.Sweep) that must be invoked every frame in order to ensure that streams which are not in use go inert.

Typical use looks like this:

func loop(w *app.Window) error {
	// Make a context that lives as long as the window.
	windowCtx, cancel := context.WithCancel(context.Background())
	defer cancel()
	// Make a controller for this window.
	controller := stream.NewController(windowCtx, w.Invalidate)

	var ops op.Ops
	for event := range w.Events() {
		switch event := event.(type) {
		case system.DestroyEvent:
			return event.Err
		case system.FrameEvent:
			gtx := layout.NewContext(&ops, event)

			// Your layout here, passing the controller so that code can instantiate streams with it.
			layoutUI(gtx, controller)

			event.Frame(gtx.Ops)

			// Transition any active unread streams to be inactive.
			controller.Sweep()
		}
	}
	return nil
}

Providing the window's Invalidate function to NewController ensures that the controller can trigger a window invalidation when new data arrives on a stream visible within the window, ensuring that the UI is redrawn automatically.

Note that Controller.Sweep is being invoked every frame.

Streams

A Stream is a restartable asynchronous computation which is automatically started and stopped based on whether it is actively used by the GUI. You can read from a stream to receive the most recent value created by the stream (if any is available yet).

Only the most recent value sent over a stream is important. Application programs should assume that any given element sent may be overridden by values sent after it. As such, you should not write streams to trickle updates over individual elements in a collection, but should instead send entire updated collections over the stream.

You can construct a Stream with New:

// Make a stream that will emit increasing integers every second.
myStream := stream.New(controller, func(ctx context.Context) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		ticks := 0
		for {
			select {
			case <-ticker.C:
				ticks++
				out <- ticks
			case <- ctx.Done():
				return
			}
		}
	}()
	return out
})

The controller provided to NewStream will be responsible for starting/stopping the stream based on whether it is in use. The Provider function provided is responsible for providing a receive-only channel of values that will close when the provided context is cancelled.

In most real applications, the Provider function passed to NewStream will perform I/O or interface with another goroutine in order to provide correct updates over the stream. See the section on Transformations for more about how to construct real-world provider functions.

Reading a stream is accomplished through one of the Read* methods. Of these, *Stream.Read is the most fundamental, and will be discussed first:

ticks, status := myStream.Read(gtx)
if Status == stream.Waiting {
	// We haven't received a value over the stream yet.
} else {
	// We have a value, so do something with ticks.
}

Reading from a stream never blocks, so it's quite common to read a stream before any value is available yet. In that case, the stream's returned status will be Waiting, and the returned value should be ignored.

If the status is Emitting, a new value/error are coming out of the stream this frame.

If the status is Cached, the returned value is not new, but has been emitted before.

Often your UI will not care about the status at all, but would rather simply work with a default value until the first data is available on the stream. To facilitate this, we have *Stream.ReadDefault:

ticks := myStream.ReadDefault(gtx, 0)

You can rely upon the returned value to either be your supplied default value or the latest value from the stream.

Finally, sometimes you want to synchronize an existing variable or struct field with the latest data from a stream. *Stream.ReadInto accomplishes this:

var ticks int // Assume we declared this elsewhere, perhaps as a field.
myStream.ReadInto(gtx, &ticks, 0)

As you can see, reading from a stream does not require a great deal of code unless your use-case demands special consideration of the status of the stream.

Results

The Result type provides an easy way to send both a value and error across a channel packed into a single type. It isn't meant to be a general-purpose type, but rather to make it easier to propagate errors from the providers of your streams to your UI when you need to. We provide a number of helper transformations and stream types to make working with streams of results easier.

ResultStream accepts a ProviderR instead of a Provider and allows you to read the error from a stream value ergonomically. We can rewrite the above tick stream example to use Result like so:

// Make a stream that will emit increasing integers (or an error) every second.
myResultStream := stream.NewR(controller, func(ctx context.Context) <-chan stream.Result[int] {
	out := make(chan stream.Result[int])
	go func() {
		defer close(out)
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		ticks := 0
		for {
			select {
			case <-ticker.C:
				ticks++
				if ticks %2 == 0 {
					out <- stream.ResultFrom(ticks, nil)
				} else {
					out <- stream.ResultFrom(0, fmt.Errorf("odd number"))
				}
			case <- ctx.Done():
				return
			}
		}
	}()
	return out
})

We can then read from it with:

ticks, status, err := myResultStream.Read(gtx)
if Status == stream.Waiting {
	// We haven't received a value over the stream yet.
} else if err != nil {
	// We have an error.
} else {
	// We have a value, so do something with ticks.
}

We also provide *ResultStream.ReadDefault and *ResultStream.ReadInto for use-cases where the status is less important.

Transformations

Most of the top-level functions in this package are transformations. They make it easier to construct channels of Ts or Result[T]s, combine those channels, and change their element type. They are intended to be used within the sourceProviders passed to New and NewR. Consider the following:

// Simple function to emit an increasing integer each time the provided duratione elapses.
func tickerProvider(ctx context.Context, dur time.Duration) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		ticks := 0
		for {
			select {
			case <-ticker.C:
				ticks++
				out <- ticks
			case <- ctx.Done():
				return
			}
		}
	}()
	return out
}

// Make a stream that will emit the product of two streams of integers as a floating-point
// value.
myStream := stream.New(controller, func(ctx context.Context) <-chan float64 {
	everySecond := tickerProvider(ctx, time.Second)
	everyFewSeconds := tickerProvider(ctx, time.Second*3)
	products := stream.Zip(everySecond, everyFewSeconds, func(a, b int) int {
		return a*b
	})
	asFloats := stream.Transform(products, func(a int) float64 {
		return float64(a)
	}
	return asFloats
})

As you can see, these functions make it relatively easy to leverage multiple [Provider]s and [ProviderR]s and combine their results. The above could have converted the output type to be a float64 within the Zip call, but uses the long way to demonstrate how to use Transform. Note that each transformation employed in constructing a stream will add some latency before the UI receives new values, so try not to let your transformation pipeline get too deep.

Transformations with an R suffix work with streams of Result elements and provide extra behaviors to make error handling easier.

Inputs

If your user interface needs to perform a complex filter operation (too expensive to do directly within the UI), you can use an Input to push the filter criteria into your sourceProvider. Construct an Input with its initial value using NewInput.

You can then reference that input within a Provider function for a stream:

// Make a stream that will emit the product of the counter input and a value that increments
// each second.
count := NewInput(5)
myStream := stream.New(controller, func(ctx context.Context) <-chan int {
	everySecond := tickerProvider(ctx, time.Second)
	return stream.Zip(everySecond, count.Stream(ctx), func(a, b int) int {
		return a*b
	})
})

As the user interacts with your UI, you can update the value of your input with Input.Send:

count.Send(10)

This method will not block unless you are performing concurrent sends, and will enable your sourceProvider to consume whatever value you send. Much like streams, only the most recently-sent value is guaranteed to be consumed by the stream.

Mutators

A Mutator is created at the start of an application to supervise stateful operations. Creating one looks like this:

// Make a context that will last the lifetime of your application.
appCtx, cancel := context.WithCancel(context.Background())
defer cancel()
mutator := stream.NewMutator(appCtx, time.Second*5)

// Run your application here, blocking until you want to end the app.

// Block until running mutations end.
mutator.Shutdown()

The mutator will wait up to the duration provided in its constructor for all running mutations to end. After that it will cancel any mutation contexts that are not already cancelled and wait up to the provided duration again. Then it will unblock and allow the application process to exit.

MutationPools

A MutationPool wraps a Mutator and provides the ability to launch stateful operations associated with a unique "key". This key mechanism can be used to ensure that two logically-identical mutations are not created at the same time for applications that need that.

A mutation pool is created like so:

// Assuming you already have a mutator.
var mutator *Mutator
// Create a pool using ints as the type of the unique key and time.Time's as the value type
// emitted by the supervised mutations.
tickerPool := stream.NewMutationPool[int,time.Time](mutator)

You can use any comparable type as a key, and any type as the mutation value type. Usually applications will want to encode identifiers for what a mutation is acting upon and how into the key. The value type should be whatever the result of the mutation is.

You can stream running mutations from the UI with:

// Assuming the MutationPool above and a stream controller.
runningTickersStream := stream.New(controller, tickerPool.Stream)

running, status := runningTickersStream.Read(gtx)

Mutations

A Mutation is a stateful process running asynchronously. You can use them for almost anything, from modifying a database to tracking the progress of a complex user flow through your application. The important difference between a mutation and a normal stream is that a mutation is *not* cancelled when the UI element that launched it stops being drawn. Mutations persist until they complete or the application shuts down (at which point the supervising Mutator will try to ensure that mutations have a chance to complete before killing them).

Mutations can take several forms:

  • Processes that make a change to application state (like a database query or API request) and then exit.
  • Processes that manage a multi-step sequence of operations (like guiding a user through a tour of an application across many different pages) and then exit when done.
  • Processes that run providing useful services for the lifetime of your application (a mutation can maintain a frequently-needed, frequently-updated value for easy access by your UI).

The only requirement for a mutation (encapsulated by the MutationProvider type) is that it eventually closes its output channel (signalling that it is complete). Mutations must terminate themselves as quickly as possible (closing their output channel) when their context is cancelled.

To launch a mutation, you need a MutationPool to host it, and the mutation must result in a sequence of values matching the MutationPool's value type. To reuse the pool from the previous example blocks, we need a mutation that produces a sequence of time.Time values.

key := 0
mutation, isNew := stream.Mutate(tickerPool, key, func(ctx context.Context) <-chan time.Time {
	out := make(chan time.Time)
	go func() {
		defer close(out)
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case t := <-ticker.C:
				out <- t
			}
		}
	}()
	return out
})

The above creates a mutation that emits times from a persistent ticker. That ticker will only stop when the mutation's context is cancelled. The returned mutation value is a handle on this running mutation. It can be used to stream values from the output channel of a mutation. The isNew return value tells the caller whether the mutation pool has returned an already-running mutation for the same key, or has created a new mutation for that key.

You can read the results of a mutation from the UI by creating a stream:

tickStream := stream.New(controller, mutation.Stream)

tick, status := tickStream.Read(gtx)

Like any other stream, you can apply transformation functions within a MutationProvider to modify the output of the mutation for *all* consumers or you can apply transformation functions in a Provider to modify the output for just the current stream.

Index

Constants

This section is empty.

Variables

View Source
var ErrNilController = fmt.Errorf("stream has nil controller")

ErrNilController indicates that an improperly-constructed stream was read. This always indicates a bug constructing the stream, usually making a stream from a struct literal instead of using the constructor function.

View Source
var ErrNilProvider = fmt.Errorf("stream has nil provider function")

ErrNilProvider indicates that an improperly-constructed stream was read. This always indicates a bug constructing the stream, usually making a stream from a struct literal instead of using the constructor function.

Functions

func Copy

func Copy[T any](in <-chan T, deepCopy func(T) T) (<-chan T, <-chan T)

Copy takes an input channel and splits it into two output channels. On each input, deepCopy will be used to produce two copies of the value which are sent on the output channels.

func CopyX

func CopyX[T any](in <-chan Result[T], deepCopy func(T) T) (<-chan Result[T], <-chan Result[T])

CopyX takes an input channel and splits it into two output channels. If an error result is input, the error will be sent in two results as output (the error value is not copied). If a non-error result is input, deepCopy will be used to produce two copies of the value which are sent on the output channels.

func Debounce

func Debounce[T any](input <-chan T, interval time.Duration) <-chan T

Debounce coalesces values emitted on the input channel so that only one value is emitted each interval. If it is not waiting after emitting a recent value, it will immediately emit a new value, but will collect values emitted within interval time after that and will only emit the final value.

func DebounceUntilStable

func DebounceUntilStable[T any](input <-chan T, interval time.Duration) <-chan T

DebounceUntilStable emits a value one `interval` after the last received input. If the inputs are rapid, the timer will reset on each input until the inputs settle enough for interval to pass between them.

func DeepCopyResult

func DeepCopyResult[T any](deepCopyValue func(T) T) func(Result[T]) Result[T]

DeepCopyFunc returns a DeepCopy Implementation for this Result using a provided function that deeply-copies values. If the Result contains an error instead of a value, the error is not copied but propagated as-is.

func Delayed

func Delayed[T, U any](input <-chan U, ctx context.Context, provider func(context.Context) <-chan T) <-chan T

Delayed waits until input is closed to invoke provider. If the provided context is cancelled, it will immediately close its output channel and exit. If the input does close, the provider func will be invoked with the provided context.

func Distinct

func Distinct[T comparable](input <-chan T) <-chan T

Distinct is DistinctFunc using "a == b" as the same func.

func DistinctFunc

func DistinctFunc[T any](input <-chan T, same func(a, b T) bool) <-chan T

DistinctFunc returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.

func DistinctFuncR

func DistinctFuncR[T any](input <-chan Result[T], same func(a, b T, aErr, bErr error) bool) <-chan Result[T]

DistinctFuncR returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.

func DistinctFuncX

func DistinctFuncX[T any](input <-chan Result[T], same func(old, new T) bool) <-chan Result[T]

DistinctFuncX returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.

func DistinctR

func DistinctR[T comparable](input <-chan Result[T]) <-chan Result[T]

DistinctR is DistinctFuncR using "a == b && errors.Is(aErr, bErr)" as the same func.

func Filter

func Filter[T, U any](in <-chan T, keep func(T) (U, bool)) <-chan U

Filter selectively drops elements from the input channel, opting to send nothing on the output. If the keep function returns true, the returned U will be emitted. Otherwise it will be dropped.

func FilterR

func FilterR[T, U any](in <-chan Result[T], keep func(T, error) (U, error, bool)) <-chan Result[U]

FilterR implements Filter for Result channels, automatically unpacking the value/error of the input and repacking the U and error returned by keep into a Result.

func FilterX

func FilterX[T, U any](in <-chan Result[T], keep func(T) (U, bool)) <-chan Result[U]

Filter selectively drops elements from the input channel, opting to send nothing on the output. If the keep function returns true, the returned U will be emitted. Otherwise it will be dropped. If the input value is an error result, the keep function will never be invoked and the output will be a result containing the same error. This prevents errors from being silently dropped.

func Join

func Join[T, U any](ctx context.Context, inputs []<-chan T, deepCopy func(T) T, merge func(old, new T) T, join func(a []T) U) <-chan U

Join combines the output elements of all of the channels in inputs using the join func. Join will not invoke join until each element of inputs has emitted at least one value. Each time a value is received from a channel in inputs, the value will be merged with the prior value emitted by that channel (if any) using the provided merge func. join will always be invoked with the most recent (merged) value emitted by each channel in inputs (ordered identically to the inputs that produced them). Join will shut down and close its output if the provided context is cancelled or if all input channels close. If there are zero inputs, Join will call join with a zero-length slice of T and emit that value. It will close its output when ctx is cancelled.

func JoinLatest

func JoinLatest[T, Out any](ctx context.Context, inputs []<-chan T, deepCopy func(T) T, joinFunc func(a []T) Out) <-chan Out

JoinLatest is like Join, but always uses the new value in its merge func.

func Latest

func Latest[T any](in <-chan T) <-chan T

Latest creates a buffered channel that will exhibit no backpressure. While waiting to send its next output element, it will always be ready to receive a new input, and it will replace its next outbound element with any new element. This is useful when you want to skip doing the work for intermediate values when multiple values are flowing through a stream pipeline.

func LatestFilter

func LatestFilter[In any, Out any](inFn func() <-chan In, tx func(In) (Out, bool)) chan Out

LatestFilter filters values, capturing the latest value, without backpressure.

The input channel is provided by a function executed within the processing goroutine. This allows the construction of the channel to block if necessary. The inFn may return nil to indicate that the LatestFilter should immediately shut down instead of processing input.

Exits on when the channel returned by inFn closes.

func MapRebuilder

func MapRebuilder[In ~map[K]T, Out any, K comparable, T comparable](input <-chan In, provider func(context.Context, In) <-chan Out) <-chan Out

MapRebuilder is a simplified ProviderRebuilder for use with map elements that have comparable keys and values.

func Multiplex

func Multiplex[In, Out, State any](
	input <-chan In,
	choose func(ctx context.Context, state State, val In) (<-chan Out, State),
) <-chan Out

Multiplex allows dynamically reconfiguring its output stream based on the last value of its input stream. The provided choose function is given the latest results from the input channel, and is responsible for choosing whether or not change the output stream. If the choose function returns a non-nil channel, the MultiplexR output channel will emit values read from that channel until the next input value arrives. Note that choose must return a non-nil stream at least once in order for any output values to ever be emitted. The ctx passed to choose is a new context that can be used to construct a new stream channel from a provider function. When choose returns a non-nil channel, any previously- created channel will have its context cancelled, ensuring no leak of goroutines. The State type allows the choose function to implement stateful operations, like ensuring that it isn't about to return a new copy of the same stream channel it emitted on its previous invocation. The choose function accepts and returns a variable of type State, and the retuned state will always be passed back to the next invocation of choose (even if the choose func returns a nil channel, the returned state will be passed to the next call to choose).

func MultiplexR

func MultiplexR[In, Out, State any](
	input <-chan Result[In],
	choose func(ctx context.Context, state State, val In, err error) (<-chan Result[Out], State),
) <-chan Result[Out]

MultiplexR allows dynamically reconfiguring its output stream based on the last value of its input stream. The provide choose function is given the latest results from the input channel, and is responsible for choosing whether or not change the output stream. If the choose function returns a non-nil channel, the MultiplexR output channel will emit values read from that channel until the next input value arrives. Note that choose must return a non-nil stream at least once in order for any output values to ever be emitted. The ctx passed to choose is a new context that can be used to construct a new stream channel from a provider function. When choose returns a non-nil channel, any previously- created channel will have its context cancelled, ensuring no leak of goroutines. The State type allows the choose function to implement stateful operations, like ensuring that it isn't about to return a new copy of the same stream channel it emitted on its previous invocation. The choose function accepts and returns a variable of type State, and the retuned state will always be passed back to the next invocation of choose (even if the choose func returns a nil channel, the returned state will be passed to the next call to choose).

func PostProcess

func PostProcess[T any](input <-chan T, f func(t T) T) <-chan T

PostProcess immediately emits any value received from input, but asynchronously invokes f on any values received and emits the resulting data afterward. This can be used to perform a time-consuming post-processing step while also getting raw data through the entire stream pipeline with as low of latency as possible. This operation only makes sense for data that can be used in its raw form as well as its processed form, like images.

func PostProcessR

func PostProcessR[T any](input <-chan Result[T], f func(t T, err error) (T, error)) <-chan Result[T]

PostProcessR immediately emits any value received from input, but asynchronously invokes f on any values received and emits the resulting data afterward. This can be used to perform a time-consuming post-processing step while also getting raw data through the entire stream pipeline with as low of latency as possible. This operation only makes sense for data that can be used in its raw form as well as its processed form, like images.

func ProviderRebuilder

func ProviderRebuilder[In, Out any](
	input <-chan In,
	deepCopy func(In) In,
	equal func(a, b In) bool,
	provider func(context.Context, In) <-chan Out,
) <-chan Out

ProviderRebuilder uses an input stream to configure an output stream. The first time a value is received on input, the provider function will be invoked on that input to produce an output stream.

Each subsequent value received on input will be compared with the previous value for equality, using the provided equal function, and the provider function will be re-run when the new input value is not considered equal. The previous invocation of the provider will have its context cancelled and its output channel drained.

The provider function may return nil to signal that the previous provider's output (if any) should still be used.

The provided deepCopy function must make a copy of In that is equal to In and shares no memory with it.

func RebuildElements

func RebuildElements[In, Out any](
	input <-chan []In,
	deepCopyInput func(In) In,
	deepCopyOutput func(Out) Out,
	equal func(a, b In) bool,
	provider func(context.Context, In) <-chan Out,
) <-chan []Out

RebuildElements is a variant of ProviderRebuilder that operates on slices of input data. It only requires deepCopy and equality operations for element types, not entire slices.

func RebuildElementsX

func RebuildElementsX[In, Out any](
	input <-chan Result[[]In],
	deepCopyInput func(In) In,
	deepCopyOutput func(Out) Out,
	equal func(a, b In) bool,
	provider func(context.Context, In) <-chan Result[Out],
) <-chan Result[[]Out]

RebuildElementsX is RebuildElements for Result-wrapped slices.

func RebuilderX

func RebuilderX[In, Out any](
	input <-chan Result[In],
	deepCopy func(In) In,
	equal func(a, b In) bool,
	provider func(context.Context, In) <-chan Result[Out],
) <-chan Result[Out]

RebuilderX is like ProviderRebuilder, but handles result-value streams by propagating the errors automatically. If input values contain errors, they will be passed along without invoking the user-provided functions, transformed to values on an appropriate output result channel.

func Resulter

func Resulter[T any](fn func(ctx context.Context) (T, error)) func(context.Context) Result[T]

Resulter adapts a (T, error) return into a Result[T] for convenient use with result based APIs.

func SliceRebuilder

func SliceRebuilder[In ~[]T, Out any, T comparable](input <-chan In, provider func(context.Context, In) <-chan Out) <-chan Out

SliceRebuilder is a simplified stream.ProviderRebuilder specialized to slices of comparable data.

func Transform

func Transform[T, U any](input <-chan T, transformer func(T) U) <-chan U

Transform returns a new channel which will emit every value sent over input transformed by the transformer function.

func TransformR

func TransformR[T, U any](input <-chan Result[T], transformer func(T, error) (U, error)) <-chan Result[U]

TransformR returns a new channel which will emit every value sent over input transformed by the transformer function.

func TransformX

func TransformX[T, U any](input <-chan Result[T], transformer func(T) (U, error)) <-chan Result[U]

TransformX returns a new channel which will emit every value sent over input transformed by the transformer function. The transformer function will not be invoked for inputs that contain an error. The error will be automatically re-emitted as a Result[U].

func Value

func Value[T any](ctx context.Context, f func(ctx context.Context) T) <-chan T

Value returns a channel built from [f]. The channel will emit whatever [f] returns, and will close when the provided context is closed.

func ValueProvider

func ValueProvider[T any](f func(ctx context.Context) T) func(ctx context.Context) <-chan T

ValueProvider is like Value, but returns a Provider function. This makes it easier to insert inline into calls to other helpers that expect a Provider.

func WrapR

func WrapR[T any](input <-chan T) <-chan Result[T]

WrapR converts a stream of non-result values into a stream of result values.

func Zip deprecated

func Zip[T, U, Out any](a <-chan T, b <-chan U, zipFunc func(a T, b U) Out) <-chan Out

Zip combines two streams of types T and U into a stream of type Out. Zip works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked to synthesize an output value. The output channel will close when both input channels close.

Deprecated: This function makes it too easy to accidentally alias mutable memory further in the pipeline. For this function to be safe, the zipFunc must copy all values it uses, and that's too easy to forget to do. Use ZipCopy instead.

func ZipCopy

func ZipCopy[T, U, Out any](a <-chan T, b <-chan U, deepCopyA func(T) T, deepCopyB func(U) U, zipFunc func(a T, b U) Out) <-chan Out

ZipCopy combines two streams of types T and U into a stream of type Out. Zip works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked with a copy of the input values to synthesize an output value. The output channel will close when both input channels close.

func ZipCopyX

func ZipCopyX[T, U, Out any](
	a <-chan Result[T],
	b <-chan Result[U],
	deepCopyA func(T) T,
	deepCopyB func(U) U,
	zipFunc func(a T, b U) (Out, error),
) <-chan Result[Out]

ZipCopyX is like ZipCopy, but propagates errors from within results automatically. If either input provides an error, the output will emit that error wrapped in an appropriately-typed result. If both inputs provide an error, the output will emit both errors joined with errors.Join within an appropriately-typed result.

func ZipR deprecated

func ZipR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U, aErr, bErr error) (Out, error)) <-chan Result[Out]

ZipR combines two streams of types T and U into a stream of type Out. ZipR works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked to synthesize an output value. The output channel will close when either both channels close.

Deprecated: This function makes it too easy to accidentally alias mutable memory further in the pipeline. For this function to be safe, the zipFunc must copy all values it uses, and that's too easy to forget to do. Use ZipCopyX instead, for the added benefit of not having to manually handle errors.

func ZipWrapR deprecated

func ZipWrapR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U) (Out, error)) <-chan Result[Out]

ZipWrapR behaves as ZipR except that it handles combining errors from channels by wrapping them together and will not invoke the zipFunc if either channel errored (instead emitting the error result).

  • If a errored and b did not, set a's error as the output error
  • If b errored and a did not, set b's error as the output error
  • If both a and b errored, set an error wrapping both as the output error
  • If there is an output error, return the zero value of Out and that error.
  • Otherwise, run zipFunc and return its results.

Deprecated: This function makes it too easy to accidentally alias mutable memory further in the pipeline. For this function to be safe, the zipFunc must copy all values it uses, and that's too easy to forget to do. Use ZipCopyX instead, for the added benefit of not having to manually handle errors.

func ZipX deprecated

func ZipX[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U) (Out, error)) <-chan Result[Out]

ZipX is like Zip, but propagates errors from within results automatically. If either input provides an error, the output will emit that error wrapped in an appropriately-typed result. If both inputs provide an error, the output will emit both errors joined with errors.Join within an appropriately-typed result.

Deprecated: This function makes it too easy to accidentally alias mutable memory further in the pipeline. For this function to be safe, the zipFunc must copy all values it uses, and that's too easy to forget to do. Use ZipCopyX instead.

Types

type ARCPool

type ARCPool[V PoolKeyer[K], K comparable, T any] struct {
	// contains filtered or unexported fields
}

ARCPool provides a reference counted stream. The underlying stream is shared across all streamers using the same key, and shutdown when there are no more streamers interested. This allows for stream resources to be both shared across all streamers and to be released when there are no longer any streamers for a given key.

func NewARCPool

func NewARCPool[V PoolKeyer[K], K comparable, T any](mutator *Mutator, provider func(context.Context, V) <-chan T) *ARCPool[V, K, T]

NewARCPool creates a ARCPoolpowered by the provided mutator.

func (*ARCPool[V, K, T]) Stream

func (m *ARCPool[V, K, T]) Stream(ctx context.Context, v V) <-chan T

Stream returns a channel on which all values are emitted until the ctx is cancelled. The underlying provider is shared across calls using the same key and is closed when there are zero streamers remaining.

func (*ARCPool[V, K, T]) StreamMutations

func (m *ARCPool[V, K, T]) StreamMutations(ctx context.Context) <-chan map[K]*Mutation[T]

StreamMutations returns a channel on which all running mutations will be emitted until the ctx is cancelled.

type ComparableKeyer

type ComparableKeyer[K comparable] struct {
	// contains filtered or unexported fields
}

ComparableKeyer wraps simple comparable types so that they implement PoolKeyer.

func Comparable

func Comparable[K comparable](k K) ComparableKeyer[K]

Comparable wraps the given k as a ComparableKeyer.

func (ComparableKeyer[K]) MutationPoolKey

func (c ComparableKeyer[K]) MutationPoolKey() K

MutationPoolKey returns the comparable key for this ComparableKeyer.

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller manages the lifecycle of asynchronous streams of data, connecting them to the frame event loop of a given application window.

func NewController

func NewController(ctx context.Context, invalidator func()) *Controller

NewController constructs a controller bound to the window lifecycle of a single application window. The provided invalidator func must trigger an invalidation of that window, and the Sweep() method must be invoked during the processing of each frame for that window. A controller can be shared among all async loading for a single window.

func (*Controller) Done

func (s *Controller) Done()

Done cleans up all streams. It should be invoked when an application window is closed in order to ensure that all associated processing shuts down with the window.

func (*Controller) Sweep

func (s *Controller) Sweep() (active, swept int)

Sweep cleans up inactive streams. It must be invoked once per frame by the event loop for the window that the stream is bound to. It returns the number of active streams after the sweep, as well as the number of streams that were swept away by the call (making them inert). Note that this is not the same as the number of inert streams.

type ErrUncleanMutatorShutdown

type ErrUncleanMutatorShutdown struct {
	MutationCount int
}

ErrUncleanMutatorShutdown occurs when one or more mutations haven't exited prior to the shutdown timeout elapsing.

func (ErrUncleanMutatorShutdown) Error

func (err ErrUncleanMutatorShutdown) Error() string

type Input

type Input[T any] struct {
	// contains filtered or unexported fields
}

Input provides data to a stream in a threadsafe way and without blocking (unless you perform concurrent sends, then it may block). It can be used to feed data into a stream data transformation pipeline from the UI event loop.

func NewDistinctInput

func NewDistinctInput[T comparable](initialValue T) Input[T]

NewInput constructs an input like NewInput, but the input will not emit values that are the same as the most-recently-emitted values. In this case, sameness is defined as having equal values (==).

func NewInput

func NewInput[T any](initialValue T) Input[T]

NewInput constructs an input, setting it to emit initialValue the first time it is read.

func NewInputEmpty

func NewInputEmpty[T any](filter func(lastValue T, newValue T) bool) Input[T]

NewInputEmpty creates a new input with no initial value. Applications may supply a filter function that will drop sending elements if it returns true.

func NewInputEmptyCtx

func NewInputEmptyCtx[T any](ctx context.Context, filter func(lastValue T, newValue T) bool) Input[T]

NewInputEmptyCtx creates a new input with no initial value. Streams reading from the input will end when the provided context is cancelled. Applications may supply a filter function that will drop sending elements if it returns true.

func (*Input[T]) Send

func (s *Input[T]) Send(t T)

Send emits t on the input, replacing any previously-emitted value that has not already been consumed.

func (*Input[T]) Stream

func (s *Input[T]) Stream(ctx context.Context) <-chan T

Stream returns a channel upon which the Input's data is available. The channel will close when ctx is cancelled. It is safe to call Stream many times on the same input, allowing it to feed into multiple processing pipelines.

type KeyedMutationPool

type KeyedMutationPool[V PoolKeyer[K], K comparable, T any] struct {
	// contains filtered or unexported fields
}

KeyedMutationPool manages a pool of [Mutations]s with a centrally-configured MutationProvider function. This provides a convenient API for pools of mutations whose arguments are contained entirely within their PoolKeyer implementation.

func NewKeyedMutationPool

func NewKeyedMutationPool[V PoolKeyer[K], K comparable, T any](mutator *Mutator, provider func(ctx context.Context, v V) <-chan T) *KeyedMutationPool[V, K, T]

NewKeyedMutationPool defines a new KeyedMutationPool with a provider function that is responsible for actually doing the work of the Mutation used for each key.

func (*KeyedMutationPool[V, K, T]) MutationForKey

func (a *KeyedMutationPool[V, K, T]) MutationForKey(v V) (*Mutation[T], bool)

MutationForKey returns the mutation for a given key and whether it is a new mutation instance.

func (*KeyedMutationPool[V, K, T]) Stream

func (a *KeyedMutationPool[V, K, T]) Stream(ctx context.Context) <-chan map[K]*Mutation[T]

Stream all of the in-progress mutations.

func (*KeyedMutationPool[V, K, T]) StreamKey

func (a *KeyedMutationPool[V, K, T]) StreamKey(ctx context.Context, v V) <-chan T

Stream either starts streaming for a given V or returns the existing channel streaming its output.

type Mutation

type Mutation[T any] struct {
	// contains filtered or unexported fields
}

Mutation is a handle on the results of an asynchronous application state change.

A Mutation is associated with a goroutine, such that the context controls the lifetime of the goroutine and the [done] channel signals when that goroutine has exited.

The goroutine is expected to be well behaved: exiting when context is cancelled and close [done] before returning.

func Mutate

func Mutate[K comparable, T any](pool *MutationPool[K, T], key K, provider MutationProvider[T]) (mut *Mutation[T], isNew bool)

Mutate attempts to start a mutation using the provider and bound to the unique key. If key is already registered to a running mutation, that mutation instance will be returned instead.

To be well behaved, the provider must exit when the context is cancelled and must close the output channel prior to returning. Failure to do so will result in leaked goroutines.

If the returned mutation is nil, no mutations can be started because the mutator is shut down. If the returned mutation is non-nil, the isNew return value indicates whether it is a newly-spawned mutation, or a reference to an already-running mutation for the same key.

func MutateKeyed

func MutateKeyed[V PoolKeyer[K], K comparable, T any](pool *MutationPool[K, T], v V, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)

MutateKeyed is identical to Mutate except that it automatically derives the mutation key from a provided PoolKeyer instead of requiring the key to be passed explicitly. This can sometimes be more ergonomic.

func MutateSimple

func MutateSimple[K comparable, T any](
	pool *MutationPool[K, T],
	key K,
	provider func(context.Context) T,
) (mut *Mutation[T], isNew bool)

MutateSimple makes it easy to construct a mutation that simply calls a synchronous provider function and returns the results once.

func MutateTimeout

func MutateTimeout[K comparable, T any](pool *MutationPool[K, T], key K, timeout time.Duration, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)

MutateTimeout does the same thing as Mutate, but sets a timeout on the mutations' context.

func PersistentStream

func PersistentStream[K comparable, T any](pool *MutationPool[K, T], key K, prov Provider[T]) (mut *Mutation[T], isNew bool)

PersistentStream attempts to start a stream with its lifecycle bound to a mutation pool instead of the UI. This is useful when many UI streams want to consume the same data, as they can all consume the output of a single persistent stream (via the returned mutation's Stream method) instead of each independently querying or generating the source data of interest. If key is already registered to a running mutation, that mutation instance will be returned instead.

NOTE(whereswaldon): the existence of this method points to mutations being misnamed. They are really a kind of stream that has a different lifecycle and are allowed to have side effects within the application. This method provides a way to access the different lifecycle while committing to not have side effects. It's unclear what a better name would be though.

If the returned mutation is nil, no mutations can be started because the mutator is shut down. If the returned mutation is non-nil, the isNew return value indicates whether it is a newly-spawned mutation, or a reference to an already-running mutation for the same key.

func (*Mutation[T]) Cancel

func (m *Mutation[T]) Cancel()

Cancel terminates the mutation. It is safe to call cancel on a nil mutation. It will do nothing.

func (*Mutation[T]) Stream

func (m *Mutation[T]) Stream(ctx context.Context) <-chan T

Stream is a Provider that emits values from the mutation's output channel. It is safe to call Stream on a nil mutation. It will return a channel that will close when the provided context is cancelled.

type MutationPool

type MutationPool[K comparable, T any] struct {
	// contains filtered or unexported fields
}

MutationPool forms a namespace for mutations whereby identical keys are deduplicated. This provides singleflight mechanics, guaranteeing that concurrent attempts to launch the same unit of work do not overlap. The current set of mutations in the pool can be observed via the Stream method.

func NewMutationPool

func NewMutationPool[K comparable, T any](mutator *Mutator) *MutationPool[K, T]

func (*MutationPool[K, T]) Stream

func (m *MutationPool[K, T]) Stream(ctx context.Context) <-chan map[K]*Mutation[T]

Stream returns a channel on which all running mutations will be emitted until the ctx is cancelled.

type MutationProvider

type MutationProvider[T any] func(ctx context.Context) (values <-chan T)

MutationProvider represents async work typically powered by a single goroutine.

This work can emit values over the output channel to communicate state changes.

To be well behaved, the provider must exit when the context is cancelled and close the output channel before returning.

The lifetime of the context is bound to the parent Mutation and in turn the Mutator. Thus the context can be cancelled a call to Mutation.Cancel or a call to Mutator.Shutdown.

type Mutator

type Mutator struct {
	// contains filtered or unexported fields
}

Mutator manages the lifecycle of asynchronous application state mutations.

func NewMutator

func NewMutator(ctx context.Context, timeout time.Duration) *Mutator

NewMutator creates a new mutator which will spawn all mutations using the provided context. The provided timeout configures how long Mutator.Shutdown will wait before giving up on a clean shutdown, see the docs on that method for details.

func (*Mutator) Shutdown

func (m *Mutator) Shutdown() error

Shutdown the Mutator. Prevents new mutations from spawning and blocks until all existing mutations complete. Once the configured timeout elapses, the root context is cancelled and mutations have that same interval to exit cleanly. In the case that one or more mutations haven't exited in time an error is returned.

type PoolKeyer

type PoolKeyer[K comparable] interface {
	// MutationPoolKey returns a key that identifies the work performed by a mutation for deduplication
	// purposes. If two mutations have the same key, [MutationPool]s will deduplicate them so the only
	// one can execute at a time.
	MutationPoolKey() K
}

PoolKeyer describes a type that can generate a unique mutation pool key for itself.

type Provider

type Provider[T any] func(context.Context) <-chan T

Provider is a function that returns a channel of values which will be closed when the provided context is cancelled. Provider functions are usually used to provide data as input to a stream's asynchronous processing. Implementations may close the output channel early to indicate that the most recently returned value is the final value for the stream.

type ProviderR

type ProviderR[T any] func(context.Context) <-chan Result[T]

ProviderR is a function that returns a channel of [Result]s which will be closed when the provided context is cancelled. ProviderR functions are usually used to provide data and errors as input to a stream's asynchronous processing. Implementations may close the output channel early to indicate that the most recently returned value is the final value for the stream.

type Result

type Result[T any] struct {
	Err error
	V   T
}

Result is a convenience type for bundling data and an error together so they can be sent over a channel.

func ResultFrom

func ResultFrom[T any](t T, e error) Result[T]

ResultFrom constructs a Result by bundling the given t and e together.

func (Result[T]) Split

func (r Result[T]) Split() (T, error)

Split unpacks the result's value and error, returning them separately.

type ResultStream

type ResultStream[T any] Stream[Result[T]]

ResultStream provides Result[T]s from asynchronous logic. It can be read safely without blocking, and must be read each frame in order to stay active. If it goes inactive, reading it again will reactivate it automatically.

func NewR

func NewR[T any](controller *Controller, provider ProviderR[T]) *ResultStream[T]

NewR creates a stream using the provided controller and provider. The provider will only be invoked when activating or reactivating the stream, not each time the stream is read. The channel returned from the provider must not close until the context is cancelled.

func NewWithAppendR

func NewWithAppendR[T any](controller *Controller, provider ProviderR[[]T]) *ResultStream[[]T]

NewWithAppendR is NewWithMergeR with a merge func that appends new elements to existing elements. If either the old or new element contains an error, the errors will be merged with errors.Join.

func NewWithMergeR

func NewWithMergeR[T any](controller *Controller, merge func(old, new Result[T]) Result[T], provider ProviderR[T]) *ResultStream[T]

NewWithMergeR is NewR with a merge func. See NewWithMerge for details on the merge func.

func (*ResultStream[T]) Read

func (s *ResultStream[T]) Read(gtx layout.Context) (value T, status Status, err error)

Read a value from the stream, if any. The returned status indicates whether the returned value and err have a meaningful value.

func (*ResultStream[T]) ReadDefault

func (s *ResultStream[T]) ReadDefault(gtx layout.Context, t T) (T, error)

ReadDefault reads from the stream, returning the provided default value if no value is available yet on the stream.

func (*ResultStream[T]) ReadInto

func (s *ResultStream[T]) ReadInto(gtx layout.Context, t *T, def T) error

ReadInto reads from the stream and (if any value is available) assigns the latest value to t. If no value is available, def is assigned to t. The returned error is the error result of the latest stream value or nil.

func (*ResultStream[T]) ReadNew

func (s *ResultStream[T]) ReadNew(gtx layout.Context) (value T, isNew bool, err error)

ReadNew returns the current value available from the stream (which may be the zero value if no value has ever been received) and a boolean indicating whether that value is newly emitted during the current frame. This function serves to shorten the common idiom:

if value, status, err := s.Read(gtx); status == stream.Emitting {
	// Do logic that should only occur when a value is emitted from the stream.
}

It can be written as:

if value, ok, err := s.ReadNew(gtx); ok {
	// Do logic that should only occur when a value is emitted from the stream.
}

type Singleton

type Singleton[T any] struct {
	// contains filtered or unexported fields
}

Singleton is a specialized MutationPool that manages only one mutation. One instance of the mutation is allowed at a time. If the mutation ends, another is allowed to start on the next call to Singleton.Run.

func NewSingleton

func NewSingleton[T any](mutator *Mutator) *Singleton[T]

NewSingleton makes a Singleton with its lifecycle managed by the given mutator.

func (*Singleton[T]) Run

func (s *Singleton[T]) Run(provider func(ctx context.Context) <-chan T) (*Mutation[T], bool)

Run the provider as the singleton's only mutation, returning the singleton's current mutation and whether the mutation was created by this call to Singleton.Run.

type Source

type Source[S, T any] struct {
	// contains filtered or unexported fields
}

Source helps contruct [Provider]s that emit a single, shared value. It may not be suitable for all applications. Streams created using a Source will receive the latest available value, but may not receive every value emitted by the valuer if new values arrive quickly. S is an internal state type, protected by the source's lock, that is used to track the current state of the source. T is the result type emitted on streams from this source. If constructed with NewSourceCtx, a Source can be "closed" much like a channel by cancelling its input context.

func NewSource

func NewSource[S, T any](valuer func(S) (T, bool)) *Source[S, T]

NewSource constructs a source using the provided valuer function to transform its current state (S) into a T. valuer must be idempotent. The boolean return value from valuer indicates whether the T should be emitted over the stream or discarded. The valuer should deep copy all data it uses in T to ensure that other invocations of valuer do not reference the same memory.

func NewSourceCtx

func NewSourceCtx[S, T any](ctx context.Context, valuer func(S) (T, bool)) *Source[S, T]

NewSourceCtx constructs a new source that will close all output streams when the provided context is cancelled. Calls to *Source.Update after the provided context is cancelled will have no effect.

func NewSourceWithAppendCtx

func NewSourceWithAppendCtx[S, T any](ctx context.Context, valuer func(S) ([]T, bool)) *Source[S, []T]

NewSourceWithAppendCtx is NewSourceWithMergeCtx, but it uses a merge func that appends new elements to the end of old elements.

func NewSourceWithMergeCtx

func NewSourceWithMergeCtx[S, T any](ctx context.Context, merge func(old, new T) T, valuer func(S) (T, bool)) *Source[S, T]

NewSourceWithMergeCtx constructs a new source that will close all output streams when the provided context is cancelled. Calls to *Source.Update after the provided context is cancelled will have no effect.

func (*Source[S, T]) Stream

func (s *Source[S, T]) Stream(ctx context.Context) <-chan T

Stream is a Provider function. The returned channel will close when the provided context is cancelled or (if the Source was constructed with NewSourceCtx) when the source's context is cancelled, and will emit any values set by Update() for which the valuer function provided at construction returns true. If Update() is invoked quickly, only the final value is guaranteed to be emitted on the channel returned by Stream.

func (*Source[S, T]) Update

func (s *Source[S, T]) Update(fn func(oldState S) S)

Update runs fn with the source's lock held, passing the current state to fn and setting the state to the return value of fn. If this Source was constructed with NewSourceCtx and the context has been cancelled, calls to Update will have no effect.

func (*Source[S, T]) UpdateIf

func (s *Source[S, T]) UpdateIf(fn func(oldState S) (S, bool))

Update runs fn with the source's lock held, passing the current state to fn and setting the state to the return value of fn iff fn returns true. If fn returns false, the returned value is discarded and no value will be sent on the stream's output. If this Source was constructed with NewSourceCtx and the context has been cancelled, calls to UpdateIf will have no effect.

type Status

type Status uint8

Status describes the state of the data read from the stream.

const (
	// Waiting indicates that the stream has never received a value.
	Waiting Status = iota
	// Emitting indicates that the stream is emitting a new value.
	Emitting
	// Cached indicates that the stream is emitting a cached copy of the most
	// recently received value.
	Cached
	// Complete indicates that the stream provider closed its output channel after
	// emitting at least one value and without being cancelled. This indicates that
	// the work for this stream is complete and the stream need not be restarted.
	// The last value received over the channel will always be returned with this
	// status.
	Complete
	// Incomplete indicates that the stream provider closed its output channel without
	// ever emitting a value. This is usually a bug in the stream provider. Any stream
	// value received with this status should be ignored.
	Incomplete
	// Uninitialized means that the stream has never been constructed. This status is
	// only returned if a nil stream is read.
	Uninitialized
)

func (Status) String

func (s Status) String() string

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream provides Ts from asynchronous logic. It can be read safely without blocking, and must be read each frame in order to stay active. If it goes inactive, reading it again will reactivate it automatically. A nil stream can be read from safely, but it is invalid to construct a stream literal without invoking a constructor function. Such streams will panic when used.

func New

func New[T any](controller *Controller, provider Provider[T]) *Stream[T]

New creates a stream using the provided controller and provider. The provider will only be invoked when activating or reactivating the stream, not each time the stream is read. The channel returned from the provider must not close until the context is cancelled.

func NewWithAppend

func NewWithAppend[T any](controller *Controller, provider Provider[[]T]) *Stream[[]T]

NewWithAppend is like NewWithMerge, but provides a merge func that automatically appends new elements to the end of existing elements. This only works with elements of slice type.

func NewWithMerge

func NewWithMerge[T any](controller *Controller, merge func(old, new T) T, provider Provider[T]) *Stream[T]

NewWithMerge is like New, but it configures the stream to merge elements waiting for delivery to the UI using the provided merge func. This ensures that no element is dropped/overwritten by a subsequent element on its way to the UI.

func Once

func Once[T any](controller *Controller, do func(ctx context.Context) T) *Stream[T]

Once creates a stream that will only emit a single value, and will not restart itself if it completes sucessfully. This is useful for one-shot async computations.

func (*Stream[T]) Read

func (s *Stream[T]) Read(gtx layout.Context) (value T, status Status)

Read a value from the stream, if any. The returned status indicates whether the returned value and err have a meaningful value.

func (*Stream[T]) ReadDefault

func (s *Stream[T]) ReadDefault(gtx layout.Context, t T) T

ReadDefault reads from the stream, returning the provided default value if no value is available yet on the stream.

func (*Stream[T]) ReadInto

func (s *Stream[T]) ReadInto(gtx layout.Context, t *T, def T)

ReadInto reads from the stream and (if any value is available) assigns the latest value to t. If no value is available, def is assigned to t.

func (*Stream[T]) ReadNew

func (s *Stream[T]) ReadNew(gtx layout.Context) (value T, isEmitting bool)

ReadNew returns the current value available from the stream (which may be the zero value if no value has ever been received) and a boolean indicating whether that value is newly emitted during the current frame. This function serves to shorten the common idiom:

if value, status := s.Read(gtx); status == stream.Emitting {
    // Do logic that should only occur when a value is emitted from the stream.
}

It can be written as:

if value, ok := s.ReadNew(gtx); ok {
    // Do logic that should only occur when a value is emitted from the stream.
}

Many streams do not need special handling for when events are emitted, and should not use this method.

type Transmitter

type Transmitter[S, T any] struct {
	// contains filtered or unexported fields
}

Transmitter allows sending messages to current streamers without impacting future streamers. It is similar to Source except that once a source is updated, future streamers will always receive the most recent value when starting a stream. Transmitter instead sends each value to current streamers only. New streamers will not receive any values until a new call to [Broadcast] emits a new value.

func NewTransmitter

func NewTransmitter[S, T any](ctx context.Context, valuer func(S) (T, bool)) *Transmitter[S, T]

NewTransmitter constructs a new transmitter that will close all output streams when the provided context is cancelled. Calls to *Transmitter.Broadcast after the provided context is cancelled will have no effect.

func NewTransmitterWithMerge

func NewTransmitterWithMerge[S, T any](ctx context.Context, merge func(old, new T) T, valuer func(S) (T, bool)) *Transmitter[S, T]

NewTransmitter constructs a new transmitter that will close all output streams when the provided context is cancelled. Calls to *Transmitter.Broadcast after the provided context is cancelled will have no effect.

func (*Transmitter[S, T]) Broadcast

func (t *Transmitter[S, T]) Broadcast(value S)

Broadcast will send value to all current streamers. Streams started after this call will not receive the value.

func (*Transmitter[S, T]) Stream

func (t *Transmitter[S, T]) Stream(ctx context.Context) <-chan T

Stream values from the Transmitter. Values sent prior to this call returning will not be received.

type UnkeyedPool

type UnkeyedPool[T any] struct {
	// contains filtered or unexported fields
}

UnkeyedPool manages the lifecycle of a group of mutations with no uniqueness invariants. Mutations do not need to provide a unique key that deduplicates instances, and every call to Run will create a new mutation.

func NewUnkeyedPool

func NewUnkeyedPool[T any](mutator *Mutator) *UnkeyedPool[T]

NewUnkeyedPool creates a mutation pool with its lifecycle managed by the given mutator.

func (*UnkeyedPool[T]) Run

func (u *UnkeyedPool[T]) Run(provider func(ctx context.Context) <-chan T) *Mutation[T]

Run the given provider func in a mutation. This method always returns a new mutation.

Directories

Path Synopsis
Package automaton defines streamable finite state machines that allow UIs to easily model stateful, multi-step application flows.
Package automaton defines streamable finite state machines that allow UIs to easily model stateful, multi-step application flows.
Package sqlitestream provides a streamable SQLite database abstraction.
Package sqlitestream provides a streamable SQLite database abstraction.
mattn_sqlx_stream
Package mattn_sqlx_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and sqlx query helpers.
Package mattn_sqlx_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and sqlx query helpers.
mattn_stdlib_stream
Package mattn_stdlib_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and Go stdlib database/sql package.
Package mattn_stdlib_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and Go stdlib database/sql package.
sqlitewatch
Package sqlitewatch implements low-level SQLite operation watching.
Package sqlitewatch implements low-level SQLite operation watching.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL