README ¶
Streams
Streams is a light weight, simple stream processing library. While Kafka is the main use case for Streams, it is flexible enough to be used for any form of processing from any source.
Note: This is currently a work in progress.
Installation
You can install streams using go get
go get github.com/msales/streams
Concepts
Streams breaks processing into the following basic parts.
-
Message is a message in the system, consisting of a key, value and context.
-
Sources reads and handles position from a data source.
-
Processor processes the data, optionally passing it on or marking the sources position. A sink is just a processor the does not forward the data on.
-
Pipe gives processors an abstract view of the current state, allowing Messages to flow through the system.
Documentation ¶
Index ¶
- Variables
- type BranchProcessor
- type Committer
- type ErrorFunc
- type FilterProcessor
- type FlatMapProcessor
- type FlatMapper
- type FlatMapperFunc
- type MapProcessor
- type Mapper
- type MapperFunc
- type MergeProcessor
- type Message
- type Metadata
- type MetadataOrigin
- type MetadataStrategy
- type Metaitem
- type Metaitems
- type Metastore
- type Monitor
- type Node
- type Pipe
- type Predicate
- type PredicateFunc
- type PrintProcessor
- type Processor
- type ProcessorNode
- type Pump
- type Source
- type SourceNode
- type SourcePump
- type SourcePumps
- type Stream
- func (s *Stream) Branch(name string, preds ...Predicate) []*Stream
- func (s *Stream) BranchFunc(name string, preds ...PredicateFunc) []*Stream
- func (s *Stream) Filter(name string, pred Predicate) *Stream
- func (s *Stream) FilterFunc(name string, pred PredicateFunc) *Stream
- func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream
- func (s *Stream) FlatMapFunc(name string, mapper FlatMapperFunc) *Stream
- func (s *Stream) Map(name string, mapper Mapper) *Stream
- func (s *Stream) MapFunc(name string, mapper MapperFunc) *Stream
- func (s *Stream) Merge(name string, streams ...*Stream) *Stream
- func (s *Stream) Print(name string) *Stream
- func (s *Stream) Process(name string, p Processor) *Stream
- type StreamBuilder
- type Supervisor
- type Task
- type TaskMode
- type TaskOptFunc
- type Tasks
- type TimedPipe
- type Topology
- type TopologyBuilder
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotRunning is returned when trying to perform an action that requires a running supervisor. ErrNotRunning = errors.New("streams: supervisor not running") // ErrAlreadyRunning is returned when starting a supervisor that has already been started. ErrAlreadyRunning = errors.New("streams: supervisor already running") // ErrUnknownPump is returned when the supervisor is unable to find a pump for a given processor. ErrUnknownPump = errors.New("streams: encountered an unknown pump") )
var EmptyMessage = Message{}
EmptyMessage is a predefined empty message.
Functions ¶
This section is empty.
Types ¶
type BranchProcessor ¶
type BranchProcessor struct {
// contains filtered or unexported fields
}
BranchProcessor is a processor that branches into one or more streams
based on the results of the predicates.
func (*BranchProcessor) Process ¶
func (p *BranchProcessor) Process(msg Message) error
Process processes the stream nodeMessage.
func (*BranchProcessor) WithPipe ¶
func (p *BranchProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Committer ¶
type Committer interface { Processor //Commit commits a processors batch. Commit(ctx context.Context) error }
Committer represents a processor that can commit.
type FilterProcessor ¶
type FilterProcessor struct {
// contains filtered or unexported fields
}
FilterProcessor is a processor that filters a stream using a predicate function.
func (*FilterProcessor) Process ¶
func (p *FilterProcessor) Process(msg Message) error
Process processes the stream Message.
func (*FilterProcessor) WithPipe ¶
func (p *FilterProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapProcessor ¶
type FlatMapProcessor struct {
// contains filtered or unexported fields
}
FlatMapProcessor is a processor that maps a stream using a flat mapping function.
func (*FlatMapProcessor) Close ¶
func (p *FlatMapProcessor) Close() error
Close closes the processor.
func (*FlatMapProcessor) Process ¶
func (p *FlatMapProcessor) Process(msg Message) error
Process processes the stream Message.
func (*FlatMapProcessor) WithPipe ¶
func (p *FlatMapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapper ¶
type FlatMapper interface { // FlatMap transforms a message into multiple messages. FlatMap(Message) ([]Message, error) }
FlatMapper represents a transformer that returns zero or many messages.
type FlatMapperFunc ¶
FlatMapperFunc represents a function implementing the FlatMapper interface.
type MapProcessor ¶
type MapProcessor struct {
// contains filtered or unexported fields
}
MapProcessor is a processor that maps a stream using a mapping function.
func (*MapProcessor) Process ¶
func (p *MapProcessor) Process(msg Message) error
Process processes the stream Message.
func (*MapProcessor) WithPipe ¶
func (p *MapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Mapper ¶
type Mapper interface { // Map transforms a message into a new value. Map(Message) (Message, error) }
Mapper represents a message transformer.
type MapperFunc ¶
MapperFunc represents a function implementing the Mapper interface.
type MergeProcessor ¶
type MergeProcessor struct {
// contains filtered or unexported fields
}
MergeProcessor is a processor that merges multiple streams.
func (*MergeProcessor) Process ¶
func (p *MergeProcessor) Process(msg Message) error
Process processes the stream Message.
func (*MergeProcessor) WithPipe ¶
func (p *MergeProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Message ¶
type Message struct { Ctx context.Context Key interface{} Value interface{} // contains filtered or unexported fields }
Message represents data the flows through the stream.
func NewMessageWithContext ¶
NewMessageWithContext creates a Message with the given context.
type Metadata ¶
type Metadata interface { // WithOrigin sets the MetadataOrigin on the metadata. WithOrigin(MetadataOrigin) // Merge merges the contained metadata into the given the metadata with the given strategy. Merge(Metadata, MetadataStrategy) Metadata }
Metadata represents metadata that can be merged.
type MetadataOrigin ¶
type MetadataOrigin uint8
MetadataOrigin represents the metadata origin type.
const ( CommitterOrigin MetadataOrigin = iota ProcessorOrigin )
MetadataOrigin types.
type MetadataStrategy ¶
type MetadataStrategy uint8
MetadataStrategy represents the metadata merge strategy.
const ( Lossless MetadataStrategy = iota Dupless )
MetadataStrategy types.
type Metastore ¶
type Metastore interface { // Pull gets and clears the processors metadata. Pull(Processor) (Metaitems, error) // PullAll gets and clears all metadata. PullAll() (map[Processor]Metaitems, error) // Mark sets metadata for a processor. Mark(Processor, Source, Metadata) error }
Metastore represents a metadata store.
Metastore is only partially concurrency safe. Mark and PullAll can be used concurrently, but Mark and Pull cannot. This is done to avoid locks and improve performance.
type Monitor ¶
type Monitor interface { // Processed adds a processed event to the Monitor. Processed(name string, l time.Duration, bp float64) // Committed adds a committed event to the Monitor. Committed(l time.Duration) // Close closes the monitor. Close() error }
Monitor represents a stream event collector.
type Node ¶
type Node interface { // Name gets the node name. Name() string // AddChild adds a child node to the node. AddChild(n Node) // Children gets the nodes children. Children() []Node // Processor gets the nodes processor. Processor() Processor }
Node represents a topology node.
type Pipe ¶
type Pipe interface { // Mark indicates that the message has been delt with Mark(Message) error // Forward queues the message with all processor children in the topology. Forward(Message) error // Forward queues the message with the the given processor(inner) child in the topology. ForwardToChild(Message, int) error // Commit commits the current state in the related sources. Commit(Message) error }
Pipe allows messages to flow through the processors.
type Predicate ¶
type Predicate interface { // Assert tests if the given message satisfies the predicate. Assert(Message) (bool, error) }
Predicate represents a predicate (boolean-valued function) of a message.
type PredicateFunc ¶
PredicateFunc represents a function implementing the Predicate interface.
type PrintProcessor ¶
type PrintProcessor struct {
// contains filtered or unexported fields
}
PrintProcessor is a processor that prints the stream to stdout.
func (*PrintProcessor) Process ¶
func (p *PrintProcessor) Process(msg Message) error
Process processes the stream Message.
func (*PrintProcessor) WithPipe ¶
func (p *PrintProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Processor ¶
type Processor interface { // WithPipe sets the pipe on the Processor. WithPipe(Pipe) // Process processes the stream Message. Process(Message) error // Close closes the processor. Close() error }
Processor represents a stream processor.
func NewBranchProcessor ¶
NewBranchProcessor creates a new BranchProcessor instance.
func NewFilterProcessor ¶
NewFilterProcessor creates a new FilterProcessor instance.
func NewFlatMapProcessor ¶
func NewFlatMapProcessor(mapper FlatMapper) Processor
NewFlatMapProcessor creates a new FlatMapProcessor instance.
func NewMapProcessor ¶
NewMapProcessor creates a new MapProcessor instance.
func NewMergeProcessor ¶
func NewMergeProcessor() Processor
NewMergeProcessor creates a new MergeProcessor instance.
func NewPrintProcessor ¶
func NewPrintProcessor() Processor
NewPrintProcessor creates a new PrintProcessor instance.
type ProcessorNode ¶
type ProcessorNode struct {
// contains filtered or unexported fields
}
ProcessorNode represents the topology node for a processor.
func NewProcessorNode ¶
func NewProcessorNode(name string, p Processor) *ProcessorNode
NewProcessorNode creates a new ProcessorNode.
func (*ProcessorNode) AddChild ¶
func (n *ProcessorNode) AddChild(node Node)
AddChild adds a child node to the node.
func (*ProcessorNode) Children ¶
func (n *ProcessorNode) Children() []Node
Children gets the nodes children.
func (*ProcessorNode) Processor ¶
func (n *ProcessorNode) Processor() Processor
Processor gets the nodes processor.
type Pump ¶
type Pump interface { sync.Locker // Accept takes a message to be processed in the Pump. Accept(Message) error // Stop stops the pump. Stop() // Close closes the pump. Close() error }
Pump represent a Message pump.
func NewAsyncPump ¶
NewAsyncPump creates a new asynchronous Pump instance.
type Source ¶
type Source interface { // Consume gets the next Message from the Source. Consume() (Message, error) // Commit marks the consumed Message as processed. Commit(interface{}) error // Close closes the Source. Close() error }
Source represents a stream source.
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
SourceNode represents a node between the source and the rest of the node tree.
func NewSourceNode ¶
func NewSourceNode(name string) *SourceNode
NewSourceNode create a new SourceNode.
func (*SourceNode) AddChild ¶
func (n *SourceNode) AddChild(node Node)
AddChild adds a child node to the node.
func (*SourceNode) Children ¶
func (n *SourceNode) Children() []Node
Children gets the nodes children.
func (*SourceNode) Processor ¶
func (n *SourceNode) Processor() Processor
Processor gets the nodes processor.
type SourcePump ¶
type SourcePump interface { // Stop stops the source pump from running. Stop() // Close closed the source pump. Close() error }
SourcePump represents a Message pump for sources.
func NewSourcePump ¶
func NewSourcePump(mon Monitor, name string, source Source, pumps []Pump, errFn ErrorFunc) SourcePump
NewSourcePump creates a new SourcePump.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents a stream of data.
func (*Stream) BranchFunc ¶
func (s *Stream) BranchFunc(name string, preds ...PredicateFunc) []*Stream
BranchFunc branches a stream based on the given predicates.
func (*Stream) FilterFunc ¶
func (s *Stream) FilterFunc(name string, pred PredicateFunc) *Stream
FilterFunc filters the stream using a predicate.
func (*Stream) FlatMap ¶
func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream
FlatMap runs a flat mapper on the stream.
func (*Stream) FlatMapFunc ¶
func (s *Stream) FlatMapFunc(name string, mapper FlatMapperFunc) *Stream
FlatMapFunc runs a flat mapper on the stream.
func (*Stream) MapFunc ¶
func (s *Stream) MapFunc(name string, mapper MapperFunc) *Stream
MapFunc runs a mapper on the stream.
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
StreamBuilder represents a stream builder.
func NewStreamBuilder ¶
func NewStreamBuilder() *StreamBuilder
NewStreamBuilder creates a new StreamBuilder.
func (*StreamBuilder) Build ¶
func (sb *StreamBuilder) Build() (*Topology, []error)
Build builds the stream Topology.
type Supervisor ¶
type Supervisor interface { io.Closer // WithContext sets the context. WithContext(context.Context) // WithMonitor sets the Monitor. WithMonitor(Monitor) // WithPumps sets a map of Pumps. WithPumps(map[Node]Pump) // Start starts the supervisor. // // This function should initiate all the background tasks of the Supervisor. // It must not be a blocking call. Start() error // Commit performs a global commit sequence. // // If triggered by a Pipe, the associated Processor should be passed. Commit(Processor) error }
Supervisor represents a concurrency-safe stream supervisor.
The Supervisor performs a commit in a concurrently-safe manner. There can only ever be 1 ongoing commit at any given time.
func NewSupervisor ¶
func NewSupervisor(store Metastore, strategy MetadataStrategy) Supervisor
NewSupervisor returns a new Supervisor instance.
func NewTimedSupervisor ¶
func NewTimedSupervisor(inner Supervisor, d time.Duration, errFn ErrorFunc) Supervisor
NewTimedSupervisor returns a supervisor that commits automatically.
type Task ¶
type Task interface { // Start starts the streams processors. Start(ctx context.Context) error // OnError sets the error handler. OnError(fn ErrorFunc) // Close stops and closes the streams processors. Close() error }
Task represents a streams task.
func NewTask ¶
func NewTask(topology *Topology, opts ...TaskOptFunc) Task
NewTask creates a new streams task.
type TaskOptFunc ¶
type TaskOptFunc func(t *streamTask)
TaskOptFunc represents a function that sets up the Task.
func WithCommitInterval ¶
func WithCommitInterval(d time.Duration) TaskOptFunc
WithCommitInterval defines an interval of automatic commits.
func WithMetadataStrategy ¶
func WithMetadataStrategy(strategy MetadataStrategy) TaskOptFunc
WithMetadataStrategy defines a strategy of metadata mergers.
func WithMonitorInterval ¶
func WithMonitorInterval(d time.Duration) TaskOptFunc
WithMonitorInterval defines an interval of stats collection.
Minimum interval is 100ms.
type Tasks ¶
type Tasks []Task
Tasks represents a slice of tasks. This is a utility type that makes it easier to work with multiple tasks.
type TimedPipe ¶
type TimedPipe interface { // Reset resets the accumulative pipe duration. Reset() // Duration returns the accumulative pipe duration. Duration() time.Duration }
TimedPipe represents a pipe that can accumulate execution time.
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
Topology represents the streams topology.
func (Topology) Processors ¶
Processors gets the topology Processors.
type TopologyBuilder ¶
type TopologyBuilder struct {
// contains filtered or unexported fields
}
TopologyBuilder represents a topology builder.
func NewTopologyBuilder ¶
func NewTopologyBuilder() *TopologyBuilder
NewTopologyBuilder creates a new TopologyBuilder.
func (*TopologyBuilder) AddProcessor ¶
func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node
AddProcessor adds a Processor to the builder, returning the created Node.
func (*TopologyBuilder) AddSource ¶
func (tb *TopologyBuilder) AddSource(name string, source Source) Node
AddSource adds a Source to the builder, returning the created Node.
func (*TopologyBuilder) Build ¶
func (tb *TopologyBuilder) Build() (*Topology, []error)
Build creates an immutable Topology.