Documentation ¶
Overview ¶
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Applicative ¶ added in v0.3.0
type Applicative[T Identifiable] func(d T) T
Applicative is a function that is applied on an individual basis for each Packet in the payload. The resulting data replaces the old data
func (Applicative[T]) Component ¶ added in v1.0.0
func (x Applicative[T]) Component(output Edge[T]) Vertex[T]
type Builder ¶
type Builder[T Identifiable] interface { Map(a Applicative[T]) Builder[T] Window(x Window[T]) Builder[T] Sort(x Comparator[T]) Builder[T] Remove(x Remover[T]) Builder[T] Combine(x Combiner[T]) Builder[T] Filter(f Filter[T]) (Builder[T], Builder[T]) Loop(x Filter[T]) (loop, out Builder[T]) OutputTo(x chan []T) }
Builder is the interface provided for creating a data processing stream.
type Combiner ¶ added in v1.0.0
type Combiner[T Identifiable] func(payload []T) T
Combiner is a function used to combine a payload into a single Packet.
type Comparator ¶ added in v0.16.5
type Comparator[T Identifiable] func(a T, b T) int
Comparator is a function to compare 2 items
func (Comparator[T]) Component ¶ added in v1.0.0
func (x Comparator[T]) Component(output Edge[T]) Vertex[T]
Component is a function for providing a vertex that can be used to run individual components on the payload.
type Component ¶ added in v1.0.0
type Component[T Identifiable] interface { Component(e Edge[T]) Vertex[T] }
Component is an interface for providing a vertex that can be used to run individual components on the payload.
type Edge ¶ added in v0.17.0
type Edge[T Identifiable] interface { OutputTo(ctx context.Context, channel chan []T) Input(payload ...T) }
Edge is an interface that is used for transferring data between vertices
func AsEdge ¶ added in v1.0.0
func AsEdge[T Identifiable](c chan []T) Edge[T]
AsEdge is a helper function to create an edge from a channel.
type EdgeProvider ¶ added in v0.17.0
type EdgeProvider[T Identifiable] interface { New(name string, option *Option[T]) Edge[T] }
EdgeProvider is an interface for providing an edge that will be used to communicate between vertices.
type Filter ¶ added in v1.0.0
type Filter[T Identifiable] func(d T) FilterResult
Filter is a function that can be used to filter the payload.
type FilterResult ¶ added in v1.0.0
type FilterResult uint8
FilterResult is a type that is used to indicate the result of a filter.
const ( // FilterLeft is a filter result that indicates that the item should move to the left branch. FilterLeft FilterResult = iota // FilterRight is a filter result that indicates that the item should move to the right branch. FilterRight // FilterBoth is a filter result that indicates that the item should go down both paths using the option.Deepcopy fn if provided. FilterBoth )
func Duplicate ¶ added in v1.0.0
func Duplicate[T Identifiable](d T) FilterResult
Duplicate shorthand for FilterBoth.
type Identifiable ¶ added in v1.0.0
type Identifiable interface {
ID() string
}
Identifiable is an interface that is used for providing an ID for a packet
type Option ¶ added in v0.2.0
type Option[T Identifiable] struct { // FIFO controls the processing order of the payloads // If set to true the system will wait for one payload // to be processed before starting the next. // Default: false FIFO bool `json:"fifo,omitempty"` // BufferSize sets the buffer size on the edge channels between the // vertices, this setting can be useful when processing large amounts // of data with FIFO turned on. // Default: 0 BufferSize int `json:"buffer_size,omitempty"` // MaxParallel sets the maximum number of parallel goroutines is only applicable when FIFO is turned off. // Default: 0 MaxParallel int `json:"max_parallel,omitempty"` // Telemetry provides the ability to enable and configure telemetry Telemetry Telemetry[T] `json:"telemetry,omitempty"` // PanicHandler is a function that is called when a panic occurs // Default: log the panic and no-op PanicHandler func(err error, payload ...T) `json:"-"` // DeepCopy is a function to preform a deep copy of the Payload DeepCopy func(T) T `json:"-"` }
Option type for holding machine settings.
type Remover ¶ added in v0.16.5
type Remover[T Identifiable] func(index int, d T) bool
Remover func that is used to remove Data based on a true result
type Span ¶ added in v1.0.0
type Span[T Identifiable] interface { RecordPayload(payload ...T) RecordError(error) SpanEnd() }
Span type for holding telemetry settings.
type Stream ¶ added in v0.5.0
type Stream[T Identifiable] interface { Start(ctx context.Context) error StartWith(ctx context.Context, input ...chan []T) error Consume(input ...chan []T) Clear() Builder() Builder[T] }
Stream is a representation of a data stream and its associated logic.
The Builder method is the entrypoint into creating the data processing flow. All branches of the Stream are required to end in an OutputTo call.
func New ¶
func New[T Identifiable](name string, provider EdgeProvider[T], options *Option[T]) Stream[T]
New is a function for creating a new Stream.
name string provider EdgeProvider[T] option *Option[T]
func NewWithChannels ¶ added in v1.0.0
func NewWithChannels[T Identifiable](name string, options *Option[T]) Stream[T]
NewWithChannels is a function for creating a new Stream using channels to pass the data.
name string option *Option[T]
type Telemetry ¶ added in v1.0.0
type Telemetry[T Identifiable] interface { PayloadSize(string, int64) IncrementPayloadCount(string) IncrementErrorCount(string) Duration(string, time.Duration) StartSpan(string) Span[T] }
Telemetry type for holding telemetry settings.
type Vertex ¶ added in v0.3.0
type Vertex[T Identifiable] func(payload []T)
Vertex is a type used to process data for a stream.
type Window ¶ added in v1.0.0
type Window[T Identifiable] func(payload []T) []T
Window is a function to work on a window of data