Documentation ¶
Overview ¶
Package streams provides a set of Source and Sink semantics to plug together object streams. Streams are closer to NodeJS object stream semantics as opposed to bufio byte level semantics. Although nothing stops one from putting together a byte stream.
Index ¶
- Variables
- type Buffer
- func (s *Buffer[T]) Finish(ctx context.Context) error
- func (s *Buffer[T]) Pause(ctx context.Context) error
- func (s *Buffer[T]) ReadSlice(parent context.Context, to []T) (int, error)
- func (s *Buffer[T]) Resume(ctx context.Context) error
- func (s *Buffer[T]) SinkEvents() *SinkEvents[T]
- func (s *Buffer[T]) SourceEvents() *SourceEvents[T]
- func (s *Buffer[T]) Write(parent context.Context, value T) error
- type BufferOpt
- type ChannelPort
- type ChannelSink
- func (c *ChannelSink[T]) ConsumeEvent(parent context.Context, e channelPortFeedback) error
- func (c *ChannelSink[T]) Finish(ctx context.Context) error
- func (c *ChannelSink[T]) Resume(ctx context.Context) error
- func (c *ChannelSink[T]) SinkEvents() *SinkEvents[T]
- func (c *ChannelSink[T]) Write(parent context.Context, v T) error
- type ChannelSource
- func (c *ChannelSource[T]) Pause(ctx context.Context) error
- func (c *ChannelSource[T]) PumpTick(parent context.Context) (count int, err error)
- func (c *ChannelSource[T]) ReadSlice(parent context.Context, to []T) (i int, err error)
- func (c *ChannelSource[T]) Resume(parent context.Context) error
- func (c *ChannelSource[T]) SourceEvents() *SourceEvents[T]
- func (c *ChannelSource[T]) WaitOnEvent(ctx context.Context) error
- type ConnectedPipe
- type ConnectedPipeOption
- type FanOutSink
- type Sink
- type SinkAvailableEvent
- type SinkEvents
- type SliceAccumulator
- type Source
- type SourceEvents
- type TransformFunc
- type Transformer
- func (t *Transformer[I, O]) Finish(ctx context.Context) error
- func (t *Transformer[I, O]) Pause(ctx context.Context) error
- func (t *Transformer[I, O]) Pump(ctx context.Context, source Source[I], target Sink[O]) errordeprecated
- func (t *Transformer[I, O]) ReadSlice(ctx context.Context, to []O) (int, error)
- func (t *Transformer[I, O]) Resume(ctx context.Context) error
- func (t *Transformer[I, O]) SinkEvents() *SinkEvents[I]
- func (t *Transformer[I, O]) SourceEvents() *SourceEvents[O]
- func (t *Transformer[I, O]) Write(ctx context.Context, v I) error
- type TransformerFunc
- type TransformerState
- type TransformingSink
Constants ¶
This section is empty.
Variables ¶
var Done = errors.New("stream done")
Done represents a writable stream which has been closed. A Sink may return this in the time between being instructed to close and still draining all elements from their buffer
var End = errors.New("stream end")
End indicates a source has read all elements available on the stream.
var Full = errors.New("stream full")
Full represents a writable stream whose internal buffers which have been filled. A Full error provides a clear signal for backpressure on the emitting source.
var Overflow = errors.New("stream overflow")
Overflow indicates the stream would overflow with the given item.
var UnderRun = errors.New("stream under run")
Functions ¶
This section is empty.
Types ¶
type Buffer ¶
type Buffer[T any] struct { Output []T // contains filtered or unexported fields }
Buffer will hold up to a limit of elements before placing back pressure on a writer. Usable as a source also.
func (*Buffer[T]) Finish ¶
Finish will transition the buffer into FinalFlush mode until all elements are read, at which point the buffer will the transition into Finished.
func (*Buffer[T]) SinkEvents ¶
func (s *Buffer[T]) SinkEvents() *SinkEvents[T]
func (*Buffer[T]) SourceEvents ¶
func (s *Buffer[T]) SourceEvents() *SourceEvents[T]
type BufferOpt ¶
func WithBufferTracePrefix ¶
type ChannelPort ¶
type ChannelPort[T any] struct { Input *ChannelSink[T] Feedback chan channelPortFeedback Output *ChannelSource[T] }
func NewChannelPort ¶
func NewChannelPort[T any](size int) *ChannelPort[T]
type ChannelSink ¶
type ChannelSink[T any] struct { Push func(ctx context.Context) error // contains filtered or unexported fields }
ChannelSink bridges a Golang `chan` into the bridge interface. Due to the nature of channels not having feedback mechanisms some events will not fire as expected without additional aid.
func NewChannelSink ¶
func NewChannelSink[T any](target chan<- T) *ChannelSink[T]
func (*ChannelSink[T]) ConsumeEvent ¶
func (c *ChannelSink[T]) ConsumeEvent(parent context.Context, e channelPortFeedback) error
func (*ChannelSink[T]) SinkEvents ¶
func (c *ChannelSink[T]) SinkEvents() *SinkEvents[T]
type ChannelSource ¶
type ChannelSource[T any] struct { // contains filtered or unexported fields }
func NewChannelSource ¶
func NewChannelSource[T any](pipe <-chan T) *ChannelSource[T]
func (*ChannelSource[T]) PumpTick ¶
func (c *ChannelSource[T]) PumpTick(parent context.Context) (count int, err error)
func (*ChannelSource[T]) ReadSlice ¶
func (c *ChannelSource[T]) ReadSlice(parent context.Context, to []T) (i int, err error)
func (*ChannelSource[T]) SourceEvents ¶
func (c *ChannelSource[T]) SourceEvents() *SourceEvents[T]
func (*ChannelSource[T]) WaitOnEvent ¶
func (c *ChannelSource[T]) WaitOnEvent(ctx context.Context) error
type ConnectedPipe ¶
type ConnectedPipe[E any] struct { // contains filtered or unexported fields }
func Connect ¶
func Connect[E any](ctx context.Context, from Source[E], to Sink[E], opts ...ConnectedPipeOption) (*ConnectedPipe[E], error)
Connect allows events to flow from a source to a sink.
type ConnectedPipeOption ¶
type ConnectedPipeOption func(c *connectedPipeConfig) error
func WithSuppressEnd ¶
func WithSuppressEnd() ConnectedPipeOption
func WithTraceAttributes ¶
func WithTraceAttributes(attrs ...attribute.KeyValue) ConnectedPipeOption
func WithTracePrefix ¶
func WithTracePrefix(tracePrefix string) ConnectedPipeOption
type FanOutSink ¶
type FanOutSink[T any] struct { // contains filtered or unexported fields }
func NewFanOutSink ¶
func NewFanOutSink[T any]() *FanOutSink[T]
func (*FanOutSink[T]) Add ¶
func (f *FanOutSink[T]) Add(target Sink[T])
func (*FanOutSink[T]) SinkEvents ¶
func (f *FanOutSink[T]) SinkEvents() *SinkEvents[T]
type Sink ¶
type Sink[T any] interface { //Write offers a given value v to the Sink. Standard errors are as follows: // * Done - If the stream has finished. No further writes will be consumed // * Full - Written element has filled the output buffers and further writes will result in Overflow // * Overflow - Element has been rejected as the sink has too many pending elements to be written // * Done - Element has been rejected as the sink is draining Write(ctx context.Context, v T) error // Finish instructs the sink to no longer accept writes, wait for internal buffers to drain, and finish. // Source must emit at least the following events: // * Finishing - Indicates the sink is no longer accepting new writes // * Finished - Indicates the sink has completed flushing the internal buffer Finish(ctx context.Context) error // SinkEvents provides access to the possible events to be emitted by the sink SinkEvents() *SinkEvents[T] // Resume instructs the Sink to resume draining internal buffers and accept writes again. // Standard errors: // * Done - The stream is finishing or finishing and is no longer accepting writes. Resume(ctx context.Context) error }
Sink represents a way to write elements to an abstract destination. Could be a memory buffer, through a channel, a database, etc. As a key counterpart to Source, a Sink provides back pressure semantics to allow buffers to be primed.
type SinkAvailableEvent ¶
type SinkAvailableEvent[T any] struct { // Space indicates how many elements can be written before Overflow is provided. This is advisory as other handlers // may have already written data. Sink.Write is the authoritative source of data for this Space int // Sink is the originating entity for this event. Sink Sink[T] }
type SinkEvents ¶
type SinkEvents[T any] struct { // Writable is dispatched to indicate space is available in a Sink. This is typically done at the end of any // logical grouping or batch which would the sink is drained to. This allows more efficient pushing. // // todo: consider merging with drained based on how clients will react. Available emitter.Dispatcher[SinkAvailableEvent[T]] // Drained is dispatched when the sink is depleted with no additional elements available. // todo: consider merging with available based on how clients will react. Drained emitter.Dispatcher[Sink[T]] // Full is dispatched when the sink is no longer accepting elements. Full emitter.Dispatcher[Sink[T]] // Finishing is emitted when the stream is waiting for all elements to be flushed but not yet finished. No further // elements will be accepted by the stream Finishing emitter.Dispatcher[Sink[T]] // Finished indicates the stream has completed flushing its internal buffer and has no further work as a sink to // complete. Finished emitter.Dispatcher[Sink[T]] }
SinkEvents are the available events to interact with the sink
type SliceAccumulator ¶
type SliceAccumulator[T any] struct { Output []T Done bool Events *SinkEvents[T] }
SliceAccumulator is a Sink for storing T in Output. Once Finish is called Finished event will be emitted.
func NewSliceAccumulator ¶
func NewSliceAccumulator[T any]() *SliceAccumulator[T]
func (*SliceAccumulator[T]) Finish ¶
func (s *SliceAccumulator[T]) Finish(ctx context.Context) error
func (*SliceAccumulator[T]) Resume ¶
func (s *SliceAccumulator[T]) Resume(ctx context.Context) error
func (*SliceAccumulator[T]) SinkEvents ¶
func (s *SliceAccumulator[T]) SinkEvents() *SinkEvents[T]
type Source ¶
type Source[T any] interface { SourceEvents() *SourceEvents[T] // ReadSlice reads up to len(to) elements from the source, returning the count of elements read into the buffer and // any problems encountered while reading. In the case 0 elements are available the source should return // UnderRun as the error. ReadSlice(ctx context.Context, to []T) (count int, problem error) // Resume begins emitting events of T until pushback is received from an event. Resume(ctx context.Context) error Pause(ctx context.Context) error }
type SourceEvents ¶
type SourceEvents[T any] struct { Data emitter.Dispatcher[T] End emitter.Dispatcher[Source[T]] }
SourceEvents are the set of events which may be emitted from a source to signal various conditions and states.
type Transformer ¶
func NewTransform ¶
func NewTransform[I any, O any](fn TransformerFunc[I, O]) *Transformer[I, O]
func (*Transformer[I, O]) ReadSlice ¶
func (t *Transformer[I, O]) ReadSlice(ctx context.Context, to []O) (int, error)
func (*Transformer[I, O]) SinkEvents ¶
func (t *Transformer[I, O]) SinkEvents() *SinkEvents[I]
func (*Transformer[I, O]) SourceEvents ¶
func (t *Transformer[I, O]) SourceEvents() *SourceEvents[O]
type TransformerFunc ¶
type TransformerState ¶
type TransformerState uint8
const ( TransformerPaused TransformerState = iota TransformerFlowing TransformerFinished )
type TransformingSink ¶
func WrapTransformingSink ¶
func WrapTransformingSink[I any, O any](wrap Sink[O], transformer TransformFunc[I, O]) *TransformingSink[I, O]
func (*TransformingSink[I, O]) Finish ¶
func (t *TransformingSink[I, O]) Finish(ctx context.Context) error
func (*TransformingSink[I, O]) Resume ¶
func (t *TransformingSink[I, O]) Resume(ctx context.Context) error
func (*TransformingSink[I, O]) SinkEvents ¶
func (t *TransformingSink[I, O]) SinkEvents() *SinkEvents[I]