Documentation ¶
Overview ¶
Package stream provides the ability to safely read asynchronous, dynamic application state from Gio layout code.
It does so by providing several constructs:
- Controllers, which connect the lifecycle of asynchronously generated data streams to the frame lifecycle of a Gio application window. Controllers handle invalidating the window when new data is available on active streams and also shut down streams when they are no longer in use.
- Streams, which are bound to a particular controller and provide asynchronous state updates while they are in use by visible widgets. When not in use, their state updates shut down to conserve resources.
- Transformations are functions that can operate on stream channels to easily create data streams from disparate sources of asynchronous information.
- Inputs provide data from the application GUI that can be safely read within a stream's asynchronous processing.
Additionally, stream provides constructs for supervising persistent or stateful asynchronous operations from your UI:
- Mutators, which maintain a list of running stateful operations and ensure that they all shut down as cleanly as possible before your application exits.
- MutationPools, which allow you to manage groups of related stateful operations and prevent duplicate operations from being created at the same time.
- Mutations, which are a (streamable) handle onto a running stateful operation.
Controllers ¶
Each window using streams needs its own Controller, which can be constructed with NewController. The controller is used to construct streams bound to that window, and has a method (Controller.Sweep) that must be invoked every frame in order to ensure that streams which are not in use go inert.
Typical use looks like this:
func loop(w *app.Window) error { // Make a context that lives as long as the window. windowCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make a controller for this window. controller := stream.NewController(windowCtx, w.Invalidate) var ops op.Ops for event := range w.Events() { switch event := event.(type) { case system.DestroyEvent: return event.Err case system.FrameEvent: gtx := layout.NewContext(&ops, event) // Your layout here, passing the controller so that code can instantiate streams with it. layoutUI(gtx, controller) event.Frame(gtx.Ops) // Transition any active unread streams to be inactive. controller.Sweep() } } return nil }
Providing the window's Invalidate function to NewController ensures that the controller can trigger a window invalidation when new data arrives on a stream visible within the window, ensuring that the UI is redrawn automatically.
Note that Controller.Sweep is being invoked every frame.
Streams ¶
A Stream is a restartable asynchronous computation which is automatically started and stopped based on whether it is actively used by the GUI. You can read from a stream to receive the most recent value created by the stream (if any is available yet).
Only the most recent value sent over a stream is important. Application programs should assume that any given element sent may be overridden by values sent after it. As such, you should not write streams to trickle updates over individual elements in a collection, but should instead send entire updated collections over the stream.
You can construct a Stream with New:
// Make a stream that will emit increasing integers every second. myStream := stream.New(controller, func(ctx context.Context) <-chan int { out := make(chan int) go func() { defer close(out) ticker := time.NewTicker(time.Second) defer ticker.Stop() ticks := 0 for { select { case <-ticker.C: ticks++ out <- ticks case <- ctx.Done(): return } } }() return out })
The controller provided to NewStream will be responsible for starting/stopping the stream based on whether it is in use. The Provider function provided is responsible for providing a receive-only channel of values that will close when the provided context is cancelled.
In most real applications, the Provider function passed to NewStream will perform I/O or interface with another goroutine in order to provide correct updates over the stream. See the section on Transformations for more about how to construct real-world provider functions.
Reading a stream is accomplished through one of the Read* methods. Of these, *Stream.Read is the most fundamental, and will be discussed first:
ticks, status := myStream.Read(gtx) if Status == stream.Waiting { // We haven't received a value over the stream yet. } else { // We have a value, so do something with ticks. }
Reading from a stream never blocks, so it's quite common to read a stream before any value is available yet. In that case, the stream's returned status will be Waiting, and the returned value should be ignored.
If the status is Emitting, a new value/error are coming out of the stream this frame.
If the status is Cached, the returned value is not new, but has been emitted before.
Often your UI will not care about the status at all, but would rather simply work with a default value until the first data is available on the stream. To facilitate this, we have *Stream.ReadDefault:
ticks := myStream.ReadDefault(gtx, 0)
You can rely upon the returned value to either be your supplied default value or the latest value from the stream.
Finally, sometimes you want to synchronize an existing variable or struct field with the latest data from a stream. *Stream.ReadInto accomplishes this:
var ticks int // Assume we declared this elsewhere, perhaps as a field. myStream.ReadInto(gtx, &ticks, 0)
As you can see, reading from a stream does not require a great deal of code unless your use-case demands special consideration of the status of the stream.
Results ¶
The Result type provides an easy way to send both a value and error across a channel packed into a single type. It isn't meant to be a general-purpose type, but rather to make it easier to propagate errors from the providers of your streams to your UI when you need to. We provide a number of helper transformations and stream types to make working with streams of results easier.
ResultStream accepts a ProviderR instead of a Provider and allows you to read the error from a stream value ergonomically. We can rewrite the above tick stream example to use Result like so:
// Make a stream that will emit increasing integers (or an error) every second. myResultStream := stream.NewR(controller, func(ctx context.Context) <-chan stream.Result[int] { out := make(chan stream.Result[int]) go func() { defer close(out) ticker := time.NewTicker(time.Second) defer ticker.Stop() ticks := 0 for { select { case <-ticker.C: ticks++ if ticks %2 == 0 { out <- stream.ResultFrom(ticks, nil) } else { out <- stream.ResultFrom(0, fmt.Errorf("odd number")) } case <- ctx.Done(): return } } }() return out })
We can then read from it with:
ticks, status, err := myResultStream.Read(gtx) if Status == stream.Waiting { // We haven't received a value over the stream yet. } else if err != nil { // We have an error. } else { // We have a value, so do something with ticks. }
We also provide *ResultStream.ReadDefault and *ResultStream.ReadInto for use-cases where the status is less important.
Transformations ¶
Most of the top-level functions in this package are transformations. They make it easier to construct channels of Ts or Result[T]s, combine those channels, and change their element type. They are intended to be used within the sourceProviders passed to New and NewR. Consider the following:
// Simple function to emit an increasing integer each time the provided duratione elapses. func tickerProvider(ctx context.Context, dur time.Duration) <-chan int { out := make(chan int) go func() { defer close(out) ticker := time.NewTicker(time.Second) defer ticker.Stop() ticks := 0 for { select { case <-ticker.C: ticks++ out <- ticks case <- ctx.Done(): return } } }() return out } // Make a stream that will emit the product of two streams of integers as a floating-point // value. myStream := stream.New(controller, func(ctx context.Context) <-chan float64 { everySecond := tickerProvider(ctx, time.Second) everyFewSeconds := tickerProvider(ctx, time.Second*3) products := stream.Zip(everySecond, everyFewSeconds, func(a, b int) int { return a*b }) asFloats := stream.Transform(products, func(a int) float64 { return float64(a) } return asFloats })
As you can see, these functions make it relatively easy to leverage multiple [Provider]s and [ProviderR]s and combine their results. The above could have converted the output type to be a float64 within the Zip call, but uses the long way to demonstrate how to use Transform. Note that each transformation employed in constructing a stream will add some latency before the UI receives new values, so try not to let your transformation pipeline get too deep.
Transformations with an R suffix work with streams of Result elements and provide extra behaviors to make error handling easier.
Inputs ¶
If your user interface needs to perform a complex filter operation (too expensive to do directly within the UI), you can use an Input to push the filter criteria into your sourceProvider. Construct an Input with its initial value using NewInput.
You can then reference that input within a Provider function for a stream:
// Make a stream that will emit the product of the counter input and a value that increments // each second. count := NewInput(5) myStream := stream.New(controller, func(ctx context.Context) <-chan int { everySecond := tickerProvider(ctx, time.Second) return stream.Zip(everySecond, count.Stream(ctx), func(a, b int) int { return a*b }) })
As the user interacts with your UI, you can update the value of your input with Input.Send:
count.Send(10)
This method will not block unless you are performing concurrent sends, and will enable your sourceProvider to consume whatever value you send. Much like streams, only the most recently-sent value is guaranteed to be consumed by the stream.
Mutators ¶
A Mutator is created at the start of an application to supervise stateful operations. Creating one looks like this:
// Make a context that will last the lifetime of your application. appCtx, cancel := context.WithCancel(context.Background()) defer cancel() mutator := stream.NewMutator(appCtx, time.Second*5) // Run your application here, blocking until you want to end the app. // Block until running mutations end. mutator.Shutdown()
The mutator will wait up to the duration provided in its constructor for all running mutations to end. After that it will cancel any mutation contexts that are not already cancelled and wait up to the provided duration again. Then it will unblock and allow the application process to exit.
MutationPools ¶
A MutationPool wraps a Mutator and provides the ability to launch stateful operations associated with a unique "key". This key mechanism can be used to ensure that two logically-identical mutations are not created at the same time for applications that need that.
A mutation pool is created like so:
// Assuming you already have a mutator. var mutator *Mutator // Create a pool using ints as the type of the unique key and time.Time's as the value type // emitted by the supervised mutations. tickerPool := stream.NewMutationPool[int,time.Time](mutator)
You can use any comparable type as a key, and any type as the mutation value type. Usually applications will want to encode identifiers for what a mutation is acting upon and how into the key. The value type should be whatever the result of the mutation is.
You can stream running mutations from the UI with:
// Assuming the MutationPool above and a stream controller. runningTickersStream := stream.New(controller, tickerPool.Stream) running, status := runningTickersStream.Read(gtx)
Mutations ¶
A Mutation is a stateful process running asynchronously. You can use them for almost anything, from modifying a database to tracking the progress of a complex user flow through your application. The important difference between a mutation and a normal stream is that a mutation is *not* cancelled when the UI element that launched it stops being drawn. Mutations persist until they complete or the application shuts down (at which point the supervising Mutator will try to ensure that mutations have a chance to complete before killing them).
Mutations can take several forms:
- Processes that make a change to application state (like a database query or API request) and then exit.
- Processes that manage a multi-step sequence of operations (like guiding a user through a tour of an application across many different pages) and then exit when done.
- Processes that run providing useful services for the lifetime of your application (a mutation can maintain a frequently-needed, frequently-updated value for easy access by your UI).
The only requirement for a mutation (encapsulated by the MutationProvider type) is that it eventually closes its output channel (signalling that it is complete). Mutations must terminate themselves as quickly as possible (closing their output channel) when their context is cancelled.
To launch a mutation, you need a MutationPool to host it, and the mutation must result in a sequence of values matching the MutationPool's value type. To reuse the pool from the previous example blocks, we need a mutation that produces a sequence of time.Time values.
key := 0 mutation, isNew := stream.Mutate(tickerPool, key, func(ctx context.Context) <-chan time.Time { out := make(chan time.Time) go func() { defer close(out) ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case t := <-ticker.C: out <- t } } }() return out })
The above creates a mutation that emits times from a persistent ticker. That ticker will only stop when the mutation's context is cancelled. The returned mutation value is a handle on this running mutation. It can be used to stream values from the output channel of a mutation. The isNew return value tells the caller whether the mutation pool has returned an already-running mutation for the same key, or has created a new mutation for that key.
You can read the results of a mutation from the UI by creating a stream:
tickStream := stream.New(controller, mutation.Stream) tick, status := tickStream.Read(gtx)
Like any other stream, you can apply transformation functions within a MutationProvider to modify the output of the mutation for *all* consumers or you can apply transformation functions in a Provider to modify the output for just the current stream.
Index ¶
- Variables
- func Copy[T any](in <-chan T, deepCopy func(T) T) (<-chan T, <-chan T)
- func CopyX[T any](in <-chan Result[T], deepCopy func(T) T) (<-chan Result[T], <-chan Result[T])
- func Debounce[T any](input <-chan T, interval time.Duration) <-chan T
- func DebounceUntilStable[T any](input <-chan T, interval time.Duration) <-chan T
- func DeepCopyResult[T any](deepCopyValue func(T) T) func(Result[T]) Result[T]
- func Delayed[T, U any](input <-chan U, ctx context.Context, provider func(context.Context) <-chan T) <-chan T
- func Distinct[T comparable](input <-chan T) <-chan T
- func DistinctFunc[T any](input <-chan T, same func(a, b T) bool) <-chan T
- func DistinctFuncR[T any](input <-chan Result[T], same func(a, b T, aErr, bErr error) bool) <-chan Result[T]
- func DistinctFuncX[T any](input <-chan Result[T], same func(old, new T) bool) <-chan Result[T]
- func DistinctR[T comparable](input <-chan Result[T]) <-chan Result[T]
- func Filter[T, U any](in <-chan T, keep func(T) (U, bool)) <-chan U
- func FilterR[T, U any](in <-chan Result[T], keep func(T, error) (U, error, bool)) <-chan Result[U]
- func FilterX[T, U any](in <-chan Result[T], keep func(T) (U, bool)) <-chan Result[U]
- func Join[T, U any](ctx context.Context, inputs []<-chan T, deepCopy func(T) T, ...) <-chan U
- func JoinLatest[T, Out any](ctx context.Context, inputs []<-chan T, deepCopy func(T) T, ...) <-chan Out
- func Latest[T any](in <-chan T) <-chan T
- func LatestFilter[In any, Out any](inFn func() <-chan In, tx func(In) (Out, bool)) chan Out
- func MapRebuilder[In ~map[K]T, Out any, K comparable, T comparable](input <-chan In, provider func(context.Context, In) <-chan Out) <-chan Out
- func Multiplex[In, Out, State any](input <-chan In, ...) <-chan Out
- func MultiplexR[In, Out, State any](input <-chan Result[In], ...) <-chan Result[Out]
- func PostProcess[T any](input <-chan T, f func(t T) T) <-chan T
- func PostProcessR[T any](input <-chan Result[T], f func(t T, err error) (T, error)) <-chan Result[T]
- func ProviderRebuilder[In, Out any](input <-chan In, deepCopy func(In) In, equal func(a, b In) bool, ...) <-chan Out
- func RebuildElements[In, Out any](input <-chan []In, deepCopyInput func(In) In, deepCopyOutput func(Out) Out, ...) <-chan []Out
- func RebuildElementsX[In, Out any](input <-chan Result[[]In], deepCopyInput func(In) In, ...) <-chan Result[[]Out]
- func RebuilderX[In, Out any](input <-chan Result[In], deepCopy func(In) In, equal func(a, b In) bool, ...) <-chan Result[Out]
- func Resulter[T any](fn func(ctx context.Context) (T, error)) func(context.Context) Result[T]
- func SliceRebuilder[In ~[]T, Out any, T comparable](input <-chan In, provider func(context.Context, In) <-chan Out) <-chan Out
- func Transform[T, U any](input <-chan T, transformer func(T) U) <-chan U
- func TransformR[T, U any](input <-chan Result[T], transformer func(T, error) (U, error)) <-chan Result[U]
- func TransformX[T, U any](input <-chan Result[T], transformer func(T) (U, error)) <-chan Result[U]
- func Value[T any](ctx context.Context, f func(ctx context.Context) T) <-chan T
- func ValueProvider[T any](f func(ctx context.Context) T) func(ctx context.Context) <-chan T
- func WrapR[T any](input <-chan T) <-chan Result[T]
- func Zip[T, U, Out any](a <-chan T, b <-chan U, zipFunc func(a T, b U) Out) <-chan Outdeprecated
- func ZipCopy[T, U, Out any](a <-chan T, b <-chan U, deepCopyA func(T) T, deepCopyB func(U) U, ...) <-chan Out
- func ZipCopyX[T, U, Out any](a <-chan Result[T], b <-chan Result[U], deepCopyA func(T) T, ...) <-chan Result[Out]
- func ZipR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], ...) <-chan Result[Out]deprecated
- func ZipWrapR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U) (Out, error)) <-chan Result[Out]deprecated
- func ZipX[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U) (Out, error)) <-chan Result[Out]deprecated
- type ARCPool
- type ComparableKeyer
- type Controller
- type ErrUncleanMutatorShutdown
- type Input
- type KeyedMutationPool
- type Mutation
- func Mutate[K comparable, T any](pool *MutationPool[K, T], key K, provider MutationProvider[T]) (mut *Mutation[T], isNew bool)
- func MutateKeyed[V PoolKeyer[K], K comparable, T any](pool *MutationPool[K, T], v V, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)
- func MutateSimple[K comparable, T any](pool *MutationPool[K, T], key K, provider func(context.Context) T) (mut *Mutation[T], isNew bool)
- func MutateTimeout[K comparable, T any](pool *MutationPool[K, T], key K, timeout time.Duration, ...) (mut *Mutation[T], isNew bool)
- func PersistentStream[K comparable, T any](pool *MutationPool[K, T], key K, prov Provider[T]) (mut *Mutation[T], isNew bool)
- type MutationPool
- type MutationProvider
- type Mutator
- type PoolKeyer
- type Provider
- type ProviderR
- type Result
- type ResultStream
- func (s *ResultStream[T]) Read(gtx layout.Context) (value T, status Status, err error)
- func (s *ResultStream[T]) ReadDefault(gtx layout.Context, t T) (T, error)
- func (s *ResultStream[T]) ReadInto(gtx layout.Context, t *T, def T) error
- func (s *ResultStream[T]) ReadNew(gtx layout.Context) (value T, isNew bool, err error)
- type Singleton
- type Source
- func NewSource[S, T any](valuer func(S) (T, bool)) *Source[S, T]
- func NewSourceCtx[S, T any](ctx context.Context, valuer func(S) (T, bool)) *Source[S, T]
- func NewSourceWithAppendCtx[S, T any](ctx context.Context, valuer func(S) ([]T, bool)) *Source[S, []T]
- func NewSourceWithMergeCtx[S, T any](ctx context.Context, merge func(old, new T) T, valuer func(S) (T, bool)) *Source[S, T]
- type Status
- type Stream
- func New[T any](controller *Controller, provider Provider[T]) *Stream[T]
- func NewWithAppend[T any](controller *Controller, provider Provider[[]T]) *Stream[[]T]
- func NewWithMerge[T any](controller *Controller, merge func(old, new T) T, provider Provider[T]) *Stream[T]
- func Once[T any](controller *Controller, do func(ctx context.Context) T) *Stream[T]
- type Transmitter
- type UnkeyedPool
Constants ¶
This section is empty.
Variables ¶
var ErrNilController = fmt.Errorf("stream has nil controller")
ErrNilController indicates that an improperly-constructed stream was read. This always indicates a bug constructing the stream, usually making a stream from a struct literal instead of using the constructor function.
var ErrNilProvider = fmt.Errorf("stream has nil provider function")
ErrNilProvider indicates that an improperly-constructed stream was read. This always indicates a bug constructing the stream, usually making a stream from a struct literal instead of using the constructor function.
Functions ¶
func Copy ¶
func Copy[T any](in <-chan T, deepCopy func(T) T) (<-chan T, <-chan T)
Copy takes an input channel and splits it into two output channels. On each input, deepCopy will be used to produce two copies of the value which are sent on the output channels.
func CopyX ¶
CopyX takes an input channel and splits it into two output channels. If an error result is input, the error will be sent in two results as output (the error value is not copied). If a non-error result is input, deepCopy will be used to produce two copies of the value which are sent on the output channels.
func Debounce ¶
Debounce coalesces values emitted on the input channel so that only one value is emitted each interval. If it is not waiting after emitting a recent value, it will immediately emit a new value, but will collect values emitted within interval time after that and will only emit the final value.
func DebounceUntilStable ¶
DebounceUntilStable emits a value one `interval` after the last received input. If the inputs are rapid, the timer will reset on each input until the inputs settle enough for interval to pass between them.
func DeepCopyResult ¶
DeepCopyFunc returns a DeepCopy Implementation for this Result using a provided function that deeply-copies values. If the Result contains an error instead of a value, the error is not copied but propagated as-is.
func Delayed ¶
func Delayed[T, U any](input <-chan U, ctx context.Context, provider func(context.Context) <-chan T) <-chan T
Delayed waits until input is closed to invoke provider. If the provided context is cancelled, it will immediately close its output channel and exit. If the input does close, the provider func will be invoked with the provided context.
func Distinct ¶
func Distinct[T comparable](input <-chan T) <-chan T
Distinct is DistinctFunc using "a == b" as the same func.
func DistinctFunc ¶
DistinctFunc returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.
func DistinctFuncR ¶
func DistinctFuncR[T any](input <-chan Result[T], same func(a, b T, aErr, bErr error) bool) <-chan Result[T]
DistinctFuncR returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.
func DistinctFuncX ¶
DistinctFuncX returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.
func DistinctR ¶
func DistinctR[T comparable](input <-chan Result[T]) <-chan Result[T]
DistinctR is DistinctFuncR using "a == b && errors.Is(aErr, bErr)" as the same func.
func Filter ¶
Filter selectively drops elements from the input channel, opting to send nothing on the output. If the keep function returns true, the returned U will be emitted. Otherwise it will be dropped.
func FilterR ¶
FilterR implements Filter for Result channels, automatically unpacking the value/error of the input and repacking the U and error returned by keep into a Result.
func FilterX ¶
Filter selectively drops elements from the input channel, opting to send nothing on the output. If the keep function returns true, the returned U will be emitted. Otherwise it will be dropped. If the input value is an error result, the keep function will never be invoked and the output will be a result containing the same error. This prevents errors from being silently dropped.
func Join ¶
func Join[T, U any](ctx context.Context, inputs []<-chan T, deepCopy func(T) T, merge func(old, new T) T, join func(a []T) U) <-chan U
Join combines the output elements of all of the channels in inputs using the join func. Join will not invoke join until each element of inputs has emitted at least one value. Each time a value is received from a channel in inputs, the value will be merged with the prior value emitted by that channel (if any) using the provided merge func. join will always be invoked with the most recent (merged) value emitted by each channel in inputs (ordered identically to the inputs that produced them). Join will shut down and close its output if the provided context is cancelled or if all input channels close. If there are zero inputs, Join will call join with a zero-length slice of T and emit that value. It will close its output when ctx is cancelled.
func JoinLatest ¶
func JoinLatest[T, Out any](ctx context.Context, inputs []<-chan T, deepCopy func(T) T, joinFunc func(a []T) Out) <-chan Out
JoinLatest is like Join, but always uses the new value in its merge func.
func Latest ¶
func Latest[T any](in <-chan T) <-chan T
Latest creates a buffered channel that will exhibit no backpressure. While waiting to send its next output element, it will always be ready to receive a new input, and it will replace its next outbound element with any new element. This is useful when you want to skip doing the work for intermediate values when multiple values are flowing through a stream pipeline.
func LatestFilter ¶
LatestFilter filters values, capturing the latest value, without backpressure.
The input channel is provided by a function executed within the processing goroutine. This allows the construction of the channel to block if necessary. The inFn may return nil to indicate that the LatestFilter should immediately shut down instead of processing input.
Exits on when the channel returned by inFn closes.
func MapRebuilder ¶
func MapRebuilder[In ~map[K]T, Out any, K comparable, T comparable](input <-chan In, provider func(context.Context, In) <-chan Out) <-chan Out
MapRebuilder is a simplified ProviderRebuilder for use with map elements that have comparable keys and values.
func Multiplex ¶
func Multiplex[In, Out, State any]( input <-chan In, choose func(ctx context.Context, state State, val In) (<-chan Out, State), ) <-chan Out
Multiplex allows dynamically reconfiguring its output stream based on the last value of its input stream. The provided choose function is given the latest results from the input channel, and is responsible for choosing whether or not change the output stream. If the choose function returns a non-nil channel, the MultiplexR output channel will emit values read from that channel until the next input value arrives. Note that choose must return a non-nil stream at least once in order for any output values to ever be emitted. The ctx passed to choose is a new context that can be used to construct a new stream channel from a provider function. When choose returns a non-nil channel, any previously- created channel will have its context cancelled, ensuring no leak of goroutines. The State type allows the choose function to implement stateful operations, like ensuring that it isn't about to return a new copy of the same stream channel it emitted on its previous invocation. The choose function accepts and returns a variable of type State, and the retuned state will always be passed back to the next invocation of choose (even if the choose func returns a nil channel, the returned state will be passed to the next call to choose).
func MultiplexR ¶
func MultiplexR[In, Out, State any]( input <-chan Result[In], choose func(ctx context.Context, state State, val In, err error) (<-chan Result[Out], State), ) <-chan Result[Out]
MultiplexR allows dynamically reconfiguring its output stream based on the last value of its input stream. The provide choose function is given the latest results from the input channel, and is responsible for choosing whether or not change the output stream. If the choose function returns a non-nil channel, the MultiplexR output channel will emit values read from that channel until the next input value arrives. Note that choose must return a non-nil stream at least once in order for any output values to ever be emitted. The ctx passed to choose is a new context that can be used to construct a new stream channel from a provider function. When choose returns a non-nil channel, any previously- created channel will have its context cancelled, ensuring no leak of goroutines. The State type allows the choose function to implement stateful operations, like ensuring that it isn't about to return a new copy of the same stream channel it emitted on its previous invocation. The choose function accepts and returns a variable of type State, and the retuned state will always be passed back to the next invocation of choose (even if the choose func returns a nil channel, the returned state will be passed to the next call to choose).
func PostProcess ¶
func PostProcess[T any](input <-chan T, f func(t T) T) <-chan T
PostProcess immediately emits any value received from input, but asynchronously invokes f on any values received and emits the resulting data afterward. This can be used to perform a time-consuming post-processing step while also getting raw data through the entire stream pipeline with as low of latency as possible. This operation only makes sense for data that can be used in its raw form as well as its processed form, like images.
func PostProcessR ¶
func PostProcessR[T any](input <-chan Result[T], f func(t T, err error) (T, error)) <-chan Result[T]
PostProcessR immediately emits any value received from input, but asynchronously invokes f on any values received and emits the resulting data afterward. This can be used to perform a time-consuming post-processing step while also getting raw data through the entire stream pipeline with as low of latency as possible. This operation only makes sense for data that can be used in its raw form as well as its processed form, like images.
func ProviderRebuilder ¶
func ProviderRebuilder[In, Out any]( input <-chan In, deepCopy func(In) In, equal func(a, b In) bool, provider func(context.Context, In) <-chan Out, ) <-chan Out
ProviderRebuilder uses an input stream to configure an output stream. The first time a value is received on input, the provider function will be invoked on that input to produce an output stream.
Each subsequent value received on input will be compared with the previous value for equality, using the provided equal function, and the provider function will be re-run when the new input value is not considered equal. The previous invocation of the provider will have its context cancelled and its output channel drained.
The provider function may return nil to signal that the previous provider's output (if any) should still be used.
The provided deepCopy function must make a copy of In that is equal to In and shares no memory with it.
func RebuildElements ¶
func RebuildElements[In, Out any]( input <-chan []In, deepCopyInput func(In) In, deepCopyOutput func(Out) Out, equal func(a, b In) bool, provider func(context.Context, In) <-chan Out, ) <-chan []Out
RebuildElements is a variant of ProviderRebuilder that operates on slices of input data. It only requires deepCopy and equality operations for element types, not entire slices.
func RebuildElementsX ¶
func RebuildElementsX[In, Out any]( input <-chan Result[[]In], deepCopyInput func(In) In, deepCopyOutput func(Out) Out, equal func(a, b In) bool, provider func(context.Context, In) <-chan Result[Out], ) <-chan Result[[]Out]
RebuildElementsX is RebuildElements for Result-wrapped slices.
func RebuilderX ¶
func RebuilderX[In, Out any]( input <-chan Result[In], deepCopy func(In) In, equal func(a, b In) bool, provider func(context.Context, In) <-chan Result[Out], ) <-chan Result[Out]
RebuilderX is like ProviderRebuilder, but handles result-value streams by propagating the errors automatically. If input values contain errors, they will be passed along without invoking the user-provided functions, transformed to values on an appropriate output result channel.
func Resulter ¶
Resulter adapts a (T, error) return into a Result[T] for convenient use with result based APIs.
func SliceRebuilder ¶
func SliceRebuilder[In ~[]T, Out any, T comparable](input <-chan In, provider func(context.Context, In) <-chan Out) <-chan Out
SliceRebuilder is a simplified stream.ProviderRebuilder specialized to slices of comparable data.
func Transform ¶
func Transform[T, U any](input <-chan T, transformer func(T) U) <-chan U
Transform returns a new channel which will emit every value sent over input transformed by the transformer function.
func TransformR ¶
func TransformR[T, U any](input <-chan Result[T], transformer func(T, error) (U, error)) <-chan Result[U]
TransformR returns a new channel which will emit every value sent over input transformed by the transformer function.
func TransformX ¶
TransformX returns a new channel which will emit every value sent over input transformed by the transformer function. The transformer function will not be invoked for inputs that contain an error. The error will be automatically re-emitted as a Result[U].
func Value ¶
Value returns a channel built from [f]. The channel will emit whatever [f] returns, and will close when the provided context is closed.
func ValueProvider ¶
ValueProvider is like Value, but returns a Provider function. This makes it easier to insert inline into calls to other helpers that expect a Provider.
func Zip
deprecated
func Zip[T, U, Out any](a <-chan T, b <-chan U, zipFunc func(a T, b U) Out) <-chan Out
Zip combines two streams of types T and U into a stream of type Out. Zip works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked to synthesize an output value. The output channel will close when both input channels close.
Deprecated: This function makes it too easy to accidentally alias mutable memory further in the pipeline. For this function to be safe, the zipFunc must copy all values it uses, and that's too easy to forget to do. Use ZipCopy instead.
func ZipCopy ¶
func ZipCopy[T, U, Out any](a <-chan T, b <-chan U, deepCopyA func(T) T, deepCopyB func(U) U, zipFunc func(a T, b U) Out) <-chan Out
ZipCopy combines two streams of types T and U into a stream of type Out. Zip works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked with a copy of the input values to synthesize an output value. The output channel will close when both input channels close.
func ZipCopyX ¶
func ZipCopyX[T, U, Out any]( a <-chan Result[T], b <-chan Result[U], deepCopyA func(T) T, deepCopyB func(U) U, zipFunc func(a T, b U) (Out, error), ) <-chan Result[Out]
ZipCopyX is like ZipCopy, but propagates errors from within results automatically. If either input provides an error, the output will emit that error wrapped in an appropriately-typed result. If both inputs provide an error, the output will emit both errors joined with errors.Join within an appropriately-typed result.
func ZipR
deprecated
func ZipR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U, aErr, bErr error) (Out, error)) <-chan Result[Out]
ZipR combines two streams of types T and U into a stream of type Out. ZipR works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked to synthesize an output value. The output channel will close when either both channels close.
Deprecated: This function makes it too easy to accidentally alias mutable memory further in the pipeline. For this function to be safe, the zipFunc must copy all values it uses, and that's too easy to forget to do. Use ZipCopyX instead, for the added benefit of not having to manually handle errors.
func ZipWrapR
deprecated
func ZipWrapR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U) (Out, error)) <-chan Result[Out]
ZipWrapR behaves as ZipR except that it handles combining errors from channels by wrapping them together and will not invoke the zipFunc if either channel errored (instead emitting the error result).
- If a errored and b did not, set a's error as the output error
- If b errored and a did not, set b's error as the output error
- If both a and b errored, set an error wrapping both as the output error
- If there is an output error, return the zero value of Out and that error.
- Otherwise, run zipFunc and return its results.
Deprecated: This function makes it too easy to accidentally alias mutable memory further in the pipeline. For this function to be safe, the zipFunc must copy all values it uses, and that's too easy to forget to do. Use ZipCopyX instead, for the added benefit of not having to manually handle errors.
func ZipX
deprecated
func ZipX[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U) (Out, error)) <-chan Result[Out]
ZipX is like Zip, but propagates errors from within results automatically. If either input provides an error, the output will emit that error wrapped in an appropriately-typed result. If both inputs provide an error, the output will emit both errors joined with errors.Join within an appropriately-typed result.
Deprecated: This function makes it too easy to accidentally alias mutable memory further in the pipeline. For this function to be safe, the zipFunc must copy all values it uses, and that's too easy to forget to do. Use ZipCopyX instead.
Types ¶
type ARCPool ¶
type ARCPool[V PoolKeyer[K], K comparable, T any] struct { // contains filtered or unexported fields }
ARCPool provides a reference counted stream. The underlying stream is shared across all streamers using the same key, and shutdown when there are no more streamers interested. This allows for stream resources to be both shared across all streamers and to be released when there are no longer any streamers for a given key.
func NewARCPool ¶
func NewARCPool[V PoolKeyer[K], K comparable, T any](mutator *Mutator, provider func(context.Context, V) <-chan T) *ARCPool[V, K, T]
NewARCPool creates a ARCPoolpowered by the provided mutator.
type ComparableKeyer ¶
type ComparableKeyer[K comparable] struct { // contains filtered or unexported fields }
ComparableKeyer wraps simple comparable types so that they implement PoolKeyer.
func Comparable ¶
func Comparable[K comparable](k K) ComparableKeyer[K]
Comparable wraps the given k as a ComparableKeyer.
func (ComparableKeyer[K]) MutationPoolKey ¶
func (c ComparableKeyer[K]) MutationPoolKey() K
MutationPoolKey returns the comparable key for this ComparableKeyer.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller manages the lifecycle of asynchronous streams of data, connecting them to the frame event loop of a given application window.
func NewController ¶
func NewController(ctx context.Context, invalidator func()) *Controller
NewController constructs a controller bound to the window lifecycle of a single application window. The provided invalidator func must trigger an invalidation of that window, and the Sweep() method must be invoked during the processing of each frame for that window. A controller can be shared among all async loading for a single window.
func (*Controller) Done ¶
func (s *Controller) Done()
Done cleans up all streams. It should be invoked when an application window is closed in order to ensure that all associated processing shuts down with the window.
func (*Controller) Sweep ¶
func (s *Controller) Sweep() (active, swept int)
Sweep cleans up inactive streams. It must be invoked once per frame by the event loop for the window that the stream is bound to. It returns the number of active streams after the sweep, as well as the number of streams that were swept away by the call (making them inert). Note that this is not the same as the number of inert streams.
type ErrUncleanMutatorShutdown ¶
type ErrUncleanMutatorShutdown struct {
MutationCount int
}
ErrUncleanMutatorShutdown occurs when one or more mutations haven't exited prior to the shutdown timeout elapsing.
func (ErrUncleanMutatorShutdown) Error ¶
func (err ErrUncleanMutatorShutdown) Error() string
type Input ¶
type Input[T any] struct { // contains filtered or unexported fields }
Input provides data to a stream in a threadsafe way and without blocking (unless you perform concurrent sends, then it may block). It can be used to feed data into a stream data transformation pipeline from the UI event loop.
func NewDistinctInput ¶
func NewDistinctInput[T comparable](initialValue T) Input[T]
NewInput constructs an input like NewInput, but the input will not emit values that are the same as the most-recently-emitted values. In this case, sameness is defined as having equal values (==).
func NewInput ¶
NewInput constructs an input, setting it to emit initialValue the first time it is read.
func NewInputEmpty ¶
NewInputEmpty creates a new input with no initial value. Applications may supply a filter function that will drop sending elements if it returns true.
func NewInputEmptyCtx ¶
func NewInputEmptyCtx[T any](ctx context.Context, filter func(lastValue T, newValue T) bool) Input[T]
NewInputEmptyCtx creates a new input with no initial value. Streams reading from the input will end when the provided context is cancelled. Applications may supply a filter function that will drop sending elements if it returns true.
func (*Input[T]) Send ¶
func (s *Input[T]) Send(t T)
Send emits t on the input, replacing any previously-emitted value that has not already been consumed.
type KeyedMutationPool ¶
type KeyedMutationPool[V PoolKeyer[K], K comparable, T any] struct { // contains filtered or unexported fields }
KeyedMutationPool manages a pool of [Mutations]s with a centrally-configured MutationProvider function. This provides a convenient API for pools of mutations whose arguments are contained entirely within their PoolKeyer implementation.
func NewKeyedMutationPool ¶
func NewKeyedMutationPool[V PoolKeyer[K], K comparable, T any](mutator *Mutator, provider func(ctx context.Context, v V) <-chan T) *KeyedMutationPool[V, K, T]
NewKeyedMutationPool defines a new KeyedMutationPool with a provider function that is responsible for actually doing the work of the Mutation used for each key.
func (*KeyedMutationPool[V, K, T]) MutationForKey ¶
func (a *KeyedMutationPool[V, K, T]) MutationForKey(v V) (*Mutation[T], bool)
MutationForKey returns the mutation for a given key and whether it is a new mutation instance.
func (*KeyedMutationPool[V, K, T]) Stream ¶
func (a *KeyedMutationPool[V, K, T]) Stream(ctx context.Context) <-chan map[K]*Mutation[T]
Stream all of the in-progress mutations.
func (*KeyedMutationPool[V, K, T]) StreamKey ¶
func (a *KeyedMutationPool[V, K, T]) StreamKey(ctx context.Context, v V) <-chan T
Stream either starts streaming for a given V or returns the existing channel streaming its output.
type Mutation ¶
type Mutation[T any] struct { // contains filtered or unexported fields }
Mutation is a handle on the results of an asynchronous application state change.
A Mutation is associated with a goroutine, such that the context controls the lifetime of the goroutine and the [done] channel signals when that goroutine has exited.
The goroutine is expected to be well behaved: exiting when context is cancelled and close [done] before returning.
func Mutate ¶
func Mutate[K comparable, T any](pool *MutationPool[K, T], key K, provider MutationProvider[T]) (mut *Mutation[T], isNew bool)
Mutate attempts to start a mutation using the provider and bound to the unique key. If key is already registered to a running mutation, that mutation instance will be returned instead.
To be well behaved, the provider must exit when the context is cancelled and must close the output channel prior to returning. Failure to do so will result in leaked goroutines.
If the returned mutation is nil, no mutations can be started because the mutator is shut down. If the returned mutation is non-nil, the isNew return value indicates whether it is a newly-spawned mutation, or a reference to an already-running mutation for the same key.
func MutateKeyed ¶
func MutateKeyed[V PoolKeyer[K], K comparable, T any](pool *MutationPool[K, T], v V, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)
MutateKeyed is identical to Mutate except that it automatically derives the mutation key from a provided PoolKeyer instead of requiring the key to be passed explicitly. This can sometimes be more ergonomic.
func MutateSimple ¶
func MutateSimple[K comparable, T any]( pool *MutationPool[K, T], key K, provider func(context.Context) T, ) (mut *Mutation[T], isNew bool)
MutateSimple makes it easy to construct a mutation that simply calls a synchronous provider function and returns the results once.
func MutateTimeout ¶
func MutateTimeout[K comparable, T any](pool *MutationPool[K, T], key K, timeout time.Duration, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)
MutateTimeout does the same thing as Mutate, but sets a timeout on the mutations' context.
func PersistentStream ¶
func PersistentStream[K comparable, T any](pool *MutationPool[K, T], key K, prov Provider[T]) (mut *Mutation[T], isNew bool)
PersistentStream attempts to start a stream with its lifecycle bound to a mutation pool instead of the UI. This is useful when many UI streams want to consume the same data, as they can all consume the output of a single persistent stream (via the returned mutation's Stream method) instead of each independently querying or generating the source data of interest. If key is already registered to a running mutation, that mutation instance will be returned instead.
NOTE(whereswaldon): the existence of this method points to mutations being misnamed. They are really a kind of stream that has a different lifecycle and are allowed to have side effects within the application. This method provides a way to access the different lifecycle while committing to not have side effects. It's unclear what a better name would be though.
If the returned mutation is nil, no mutations can be started because the mutator is shut down. If the returned mutation is non-nil, the isNew return value indicates whether it is a newly-spawned mutation, or a reference to an already-running mutation for the same key.
type MutationPool ¶
type MutationPool[K comparable, T any] struct { // contains filtered or unexported fields }
MutationPool forms a namespace for mutations whereby identical keys are deduplicated. This provides singleflight mechanics, guaranteeing that concurrent attempts to launch the same unit of work do not overlap. The current set of mutations in the pool can be observed via the Stream method.
func NewMutationPool ¶
func NewMutationPool[K comparable, T any](mutator *Mutator) *MutationPool[K, T]
type MutationProvider ¶
MutationProvider represents async work typically powered by a single goroutine.
This work can emit values over the output channel to communicate state changes.
To be well behaved, the provider must exit when the context is cancelled and close the output channel before returning.
The lifetime of the context is bound to the parent Mutation and in turn the Mutator. Thus the context can be cancelled a call to Mutation.Cancel or a call to Mutator.Shutdown.
type Mutator ¶
type Mutator struct {
// contains filtered or unexported fields
}
Mutator manages the lifecycle of asynchronous application state mutations.
func NewMutator ¶
NewMutator creates a new mutator which will spawn all mutations using the provided context. The provided timeout configures how long Mutator.Shutdown will wait before giving up on a clean shutdown, see the docs on that method for details.
func (*Mutator) Shutdown ¶
Shutdown the Mutator. Prevents new mutations from spawning and blocks until all existing mutations complete. Once the configured timeout elapses, the root context is cancelled and mutations have that same interval to exit cleanly. In the case that one or more mutations haven't exited in time an error is returned.
type PoolKeyer ¶
type PoolKeyer[K comparable] interface { // MutationPoolKey returns a key that identifies the work performed by a mutation for deduplication // purposes. If two mutations have the same key, [MutationPool]s will deduplicate them so the only // one can execute at a time. MutationPoolKey() K }
PoolKeyer describes a type that can generate a unique mutation pool key for itself.
type Provider ¶
Provider is a function that returns a channel of values which will be closed when the provided context is cancelled. Provider functions are usually used to provide data as input to a stream's asynchronous processing. Implementations may close the output channel early to indicate that the most recently returned value is the final value for the stream.
type ProviderR ¶
ProviderR is a function that returns a channel of [Result]s which will be closed when the provided context is cancelled. ProviderR functions are usually used to provide data and errors as input to a stream's asynchronous processing. Implementations may close the output channel early to indicate that the most recently returned value is the final value for the stream.
type Result ¶
Result is a convenience type for bundling data and an error together so they can be sent over a channel.
func ResultFrom ¶
ResultFrom constructs a Result by bundling the given t and e together.
type ResultStream ¶
ResultStream provides Result[T]s from asynchronous logic. It can be read safely without blocking, and must be read each frame in order to stay active. If it goes inactive, reading it again will reactivate it automatically.
func NewR ¶
func NewR[T any](controller *Controller, provider ProviderR[T]) *ResultStream[T]
NewR creates a stream using the provided controller and provider. The provider will only be invoked when activating or reactivating the stream, not each time the stream is read. The channel returned from the provider must not close until the context is cancelled.
func NewWithAppendR ¶
func NewWithAppendR[T any](controller *Controller, provider ProviderR[[]T]) *ResultStream[[]T]
NewWithAppendR is NewWithMergeR with a merge func that appends new elements to existing elements. If either the old or new element contains an error, the errors will be merged with errors.Join.
func NewWithMergeR ¶
func NewWithMergeR[T any](controller *Controller, merge func(old, new Result[T]) Result[T], provider ProviderR[T]) *ResultStream[T]
NewWithMergeR is NewR with a merge func. See NewWithMerge for details on the merge func.
func (*ResultStream[T]) Read ¶
func (s *ResultStream[T]) Read(gtx layout.Context) (value T, status Status, err error)
Read a value from the stream, if any. The returned status indicates whether the returned value and err have a meaningful value.
func (*ResultStream[T]) ReadDefault ¶
func (s *ResultStream[T]) ReadDefault(gtx layout.Context, t T) (T, error)
ReadDefault reads from the stream, returning the provided default value if no value is available yet on the stream.
func (*ResultStream[T]) ReadInto ¶
func (s *ResultStream[T]) ReadInto(gtx layout.Context, t *T, def T) error
ReadInto reads from the stream and (if any value is available) assigns the latest value to t. If no value is available, def is assigned to t. The returned error is the error result of the latest stream value or nil.
func (*ResultStream[T]) ReadNew ¶
func (s *ResultStream[T]) ReadNew(gtx layout.Context) (value T, isNew bool, err error)
ReadNew returns the current value available from the stream (which may be the zero value if no value has ever been received) and a boolean indicating whether that value is newly emitted during the current frame. This function serves to shorten the common idiom:
if value, status, err := s.Read(gtx); status == stream.Emitting { // Do logic that should only occur when a value is emitted from the stream. }
It can be written as:
if value, ok, err := s.ReadNew(gtx); ok { // Do logic that should only occur when a value is emitted from the stream. }
type Singleton ¶
type Singleton[T any] struct { // contains filtered or unexported fields }
Singleton is a specialized MutationPool that manages only one mutation. One instance of the mutation is allowed at a time. If the mutation ends, another is allowed to start on the next call to Singleton.Run.
func NewSingleton ¶
NewSingleton makes a Singleton with its lifecycle managed by the given mutator.
type Source ¶
type Source[S, T any] struct { // contains filtered or unexported fields }
Source helps contruct [Provider]s that emit a single, shared value. It may not be suitable for all applications. Streams created using a Source will receive the latest available value, but may not receive every value emitted by the valuer if new values arrive quickly. S is an internal state type, protected by the source's lock, that is used to track the current state of the source. T is the result type emitted on streams from this source. If constructed with NewSourceCtx, a Source can be "closed" much like a channel by cancelling its input context.
func NewSource ¶
NewSource constructs a source using the provided valuer function to transform its current state (S) into a T. valuer must be idempotent. The boolean return value from valuer indicates whether the T should be emitted over the stream or discarded. The valuer should deep copy all data it uses in T to ensure that other invocations of valuer do not reference the same memory.
func NewSourceCtx ¶
NewSourceCtx constructs a new source that will close all output streams when the provided context is cancelled. Calls to *Source.Update after the provided context is cancelled will have no effect.
func NewSourceWithAppendCtx ¶
func NewSourceWithAppendCtx[S, T any](ctx context.Context, valuer func(S) ([]T, bool)) *Source[S, []T]
NewSourceWithAppendCtx is NewSourceWithMergeCtx, but it uses a merge func that appends new elements to the end of old elements.
func NewSourceWithMergeCtx ¶
func NewSourceWithMergeCtx[S, T any](ctx context.Context, merge func(old, new T) T, valuer func(S) (T, bool)) *Source[S, T]
NewSourceWithMergeCtx constructs a new source that will close all output streams when the provided context is cancelled. Calls to *Source.Update after the provided context is cancelled will have no effect.
func (*Source[S, T]) Stream ¶
Stream is a Provider function. The returned channel will close when the provided context is cancelled or (if the Source was constructed with NewSourceCtx) when the source's context is cancelled, and will emit any values set by Update() for which the valuer function provided at construction returns true. If Update() is invoked quickly, only the final value is guaranteed to be emitted on the channel returned by Stream.
func (*Source[S, T]) Update ¶
func (s *Source[S, T]) Update(fn func(oldState S) S)
Update runs fn with the source's lock held, passing the current state to fn and setting the state to the return value of fn. If this Source was constructed with NewSourceCtx and the context has been cancelled, calls to Update will have no effect.
func (*Source[S, T]) UpdateIf ¶
Update runs fn with the source's lock held, passing the current state to fn and setting the state to the return value of fn iff fn returns true. If fn returns false, the returned value is discarded and no value will be sent on the stream's output. If this Source was constructed with NewSourceCtx and the context has been cancelled, calls to UpdateIf will have no effect.
type Status ¶
type Status uint8
Status describes the state of the data read from the stream.
const ( // Waiting indicates that the stream has never received a value. Waiting Status = iota // Emitting indicates that the stream is emitting a new value. Emitting // Cached indicates that the stream is emitting a cached copy of the most // recently received value. Cached // Complete indicates that the stream provider closed its output channel after // emitting at least one value and without being cancelled. This indicates that // the work for this stream is complete and the stream need not be restarted. // The last value received over the channel will always be returned with this // status. Complete // Incomplete indicates that the stream provider closed its output channel without // ever emitting a value. This is usually a bug in the stream provider. Any stream // value received with this status should be ignored. Incomplete // Uninitialized means that the stream has never been constructed. This status is // only returned if a nil stream is read. Uninitialized )
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
Stream provides Ts from asynchronous logic. It can be read safely without blocking, and must be read each frame in order to stay active. If it goes inactive, reading it again will reactivate it automatically. A nil stream can be read from safely, but it is invalid to construct a stream literal without invoking a constructor function. Such streams will panic when used.
func New ¶
func New[T any](controller *Controller, provider Provider[T]) *Stream[T]
New creates a stream using the provided controller and provider. The provider will only be invoked when activating or reactivating the stream, not each time the stream is read. The channel returned from the provider must not close until the context is cancelled.
func NewWithAppend ¶
func NewWithAppend[T any](controller *Controller, provider Provider[[]T]) *Stream[[]T]
NewWithAppend is like NewWithMerge, but provides a merge func that automatically appends new elements to the end of existing elements. This only works with elements of slice type.
func NewWithMerge ¶
func NewWithMerge[T any](controller *Controller, merge func(old, new T) T, provider Provider[T]) *Stream[T]
NewWithMerge is like New, but it configures the stream to merge elements waiting for delivery to the UI using the provided merge func. This ensures that no element is dropped/overwritten by a subsequent element on its way to the UI.
func Once ¶
func Once[T any](controller *Controller, do func(ctx context.Context) T) *Stream[T]
Once creates a stream that will only emit a single value, and will not restart itself if it completes sucessfully. This is useful for one-shot async computations.
func (*Stream[T]) Read ¶
Read a value from the stream, if any. The returned status indicates whether the returned value and err have a meaningful value.
func (*Stream[T]) ReadDefault ¶
ReadDefault reads from the stream, returning the provided default value if no value is available yet on the stream.
func (*Stream[T]) ReadInto ¶
ReadInto reads from the stream and (if any value is available) assigns the latest value to t. If no value is available, def is assigned to t.
func (*Stream[T]) ReadNew ¶
ReadNew returns the current value available from the stream (which may be the zero value if no value has ever been received) and a boolean indicating whether that value is newly emitted during the current frame. This function serves to shorten the common idiom:
if value, status := s.Read(gtx); status == stream.Emitting { // Do logic that should only occur when a value is emitted from the stream. }
It can be written as:
if value, ok := s.ReadNew(gtx); ok { // Do logic that should only occur when a value is emitted from the stream. }
Many streams do not need special handling for when events are emitted, and should not use this method.
type Transmitter ¶
type Transmitter[S, T any] struct { // contains filtered or unexported fields }
Transmitter allows sending messages to current streamers without impacting future streamers. It is similar to Source except that once a source is updated, future streamers will always receive the most recent value when starting a stream. Transmitter instead sends each value to current streamers only. New streamers will not receive any values until a new call to [Broadcast] emits a new value.
func NewTransmitter ¶
func NewTransmitter[S, T any](ctx context.Context, valuer func(S) (T, bool)) *Transmitter[S, T]
NewTransmitter constructs a new transmitter that will close all output streams when the provided context is cancelled. Calls to *Transmitter.Broadcast after the provided context is cancelled will have no effect.
func NewTransmitterWithMerge ¶
func NewTransmitterWithMerge[S, T any](ctx context.Context, merge func(old, new T) T, valuer func(S) (T, bool)) *Transmitter[S, T]
NewTransmitter constructs a new transmitter that will close all output streams when the provided context is cancelled. Calls to *Transmitter.Broadcast after the provided context is cancelled will have no effect.
func (*Transmitter[S, T]) Broadcast ¶
func (t *Transmitter[S, T]) Broadcast(value S)
Broadcast will send value to all current streamers. Streams started after this call will not receive the value.
func (*Transmitter[S, T]) Stream ¶
func (t *Transmitter[S, T]) Stream(ctx context.Context) <-chan T
Stream values from the Transmitter. Values sent prior to this call returning will not be received.
type UnkeyedPool ¶
type UnkeyedPool[T any] struct { // contains filtered or unexported fields }
UnkeyedPool manages the lifecycle of a group of mutations with no uniqueness invariants. Mutations do not need to provide a unique key that deduplicates instances, and every call to Run will create a new mutation.
func NewUnkeyedPool ¶
func NewUnkeyedPool[T any](mutator *Mutator) *UnkeyedPool[T]
NewUnkeyedPool creates a mutation pool with its lifecycle managed by the given mutator.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package automaton defines streamable finite state machines that allow UIs to easily model stateful, multi-step application flows.
|
Package automaton defines streamable finite state machines that allow UIs to easily model stateful, multi-step application flows. |
Package sqlitestream provides a streamable SQLite database abstraction.
|
Package sqlitestream provides a streamable SQLite database abstraction. |
mattn_sqlx_stream
Package mattn_sqlx_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and sqlx query helpers.
|
Package mattn_sqlx_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and sqlx query helpers. |
mattn_stdlib_stream
Package mattn_stdlib_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and Go stdlib database/sql package.
|
Package mattn_stdlib_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and Go stdlib database/sql package. |
sqlitewatch
Package sqlitewatch implements low-level SQLite operation watching.
|
Package sqlitewatch implements low-level SQLite operation watching. |