streams

package module
v6.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2022 License: MIT Imports: 10 Imported by: 0

README

Streams

Go Report Card Test Coverage Status GoDoc GitHub release GitHub license

Streams is a light weight, simple stream processing library. While Kafka is the main use case for Streams, it is flexible enough to be used for any form of processing from any source.

Installation

You can install streams using go get

go get github.com/msales/streams

Concepts

Streams breaks processing into the following basic parts.

  • Message is a message in the system, consisting of a key, value and context.

  • Sources reads and handles position from a data source.

  • Processor processes the data, optionally passing it on or marking the sources position. A sink is just a processor the does not forward the data on.

  • Pipe gives processors an abstract view of the current state, allowing Messages to flow through the system.

Read more here:

https://medium.com/@rafamnich/getting-started-with-streams-v3-b9ab36fb9d54

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotRunning is returned when trying to perform an action that requires a running supervisor.
	ErrNotRunning = errors.New("streams: supervisor not running")
	// ErrAlreadyRunning is returned when starting a supervisor that has already been started.
	ErrAlreadyRunning = errors.New("streams: supervisor already running")
	// ErrUnknownPump is returned when the supervisor is unable to find a pump for a given processor.
	ErrUnknownPump = errors.New("streams: encountered an unknown pump")
)
View Source
var EmptyMessage = Message{}

EmptyMessage is a predefined empty message.

Functions

This section is empty.

Types

type BranchProcessor

type BranchProcessor struct {
	// contains filtered or unexported fields
}

BranchProcessor is a processor that branches into one or more streams

based on the results of the predicates.

func (*BranchProcessor) Close

func (p *BranchProcessor) Close() error

Close closes the processor.

func (*BranchProcessor) Process

func (p *BranchProcessor) Process(msg Message) error

Process processes the stream nodeMessage.

func (*BranchProcessor) WithPipe

func (p *BranchProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Committer

type Committer interface {
	Processor

	// Commit commits a processors batch.
	Commit(ctx context.Context) error
}

Committer represents a processor that can commit.

type ErrorFunc

type ErrorFunc func(error)

ErrorFunc represents a streams error handling function.

type FanOutProcessor added in v6.2.0

type FanOutProcessor struct {
	// contains filtered or unexported fields
}

FanOutProcessor is a processor that passes the message to multiple children.

func (*FanOutProcessor) Close added in v6.2.0

func (p *FanOutProcessor) Close() error

Close closes the processor.

func (*FanOutProcessor) Process added in v6.2.0

func (p *FanOutProcessor) Process(msg Message) error

Process processes the stream nodeMessage.

func (*FanOutProcessor) WithPipe added in v6.2.0

func (p *FanOutProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type FilterProcessor

type FilterProcessor struct {
	// contains filtered or unexported fields
}

FilterProcessor is a processor that filters a stream using a predicate function.

func (*FilterProcessor) Close

func (p *FilterProcessor) Close() error

Close closes the processor.

func (*FilterProcessor) Process

func (p *FilterProcessor) Process(msg Message) error

Process processes the stream Message.

func (*FilterProcessor) WithPipe

func (p *FilterProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type FlatMapProcessor

type FlatMapProcessor struct {
	// contains filtered or unexported fields
}

FlatMapProcessor is a processor that maps a stream using a flat mapping function.

func (*FlatMapProcessor) Close

func (p *FlatMapProcessor) Close() error

Close closes the processor.

func (*FlatMapProcessor) Process

func (p *FlatMapProcessor) Process(msg Message) error

Process processes the stream Message.

func (*FlatMapProcessor) WithPipe

func (p *FlatMapProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type FlatMapper

type FlatMapper interface {
	// FlatMap transforms a message into multiple messages.
	FlatMap(Message) ([]Message, error)
}

FlatMapper represents a transformer that returns zero or many messages.

type FlatMapperFunc

type FlatMapperFunc func(Message) ([]Message, error)

FlatMapperFunc represents a function implementing the FlatMapper interface.

func (FlatMapperFunc) FlatMap

func (fn FlatMapperFunc) FlatMap(msg Message) ([]Message, error)

FlatMap transforms a message into multiple messages.

type MapProcessor

type MapProcessor struct {
	// contains filtered or unexported fields
}

MapProcessor is a processor that maps a stream using a mapping function.

func (*MapProcessor) Close

func (p *MapProcessor) Close() error

Close closes the processor.

func (*MapProcessor) Process

func (p *MapProcessor) Process(msg Message) error

Process processes the stream Message.

func (*MapProcessor) WithPipe

func (p *MapProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Mapper

type Mapper interface {
	// Map transforms a message into a new value.
	Map(Message) (Message, error)
}

Mapper represents a message transformer.

type MapperFunc

type MapperFunc func(Message) (Message, error)

MapperFunc represents a function implementing the Mapper interface.

func (MapperFunc) Map

func (fn MapperFunc) Map(msg Message) (Message, error)

Map transforms a message into a new value.

type MergeProcessor

type MergeProcessor struct {
	// contains filtered or unexported fields
}

MergeProcessor is a processor that merges multiple streams.

func (*MergeProcessor) Close

func (p *MergeProcessor) Close() error

Close closes the processor.

func (*MergeProcessor) Process

func (p *MergeProcessor) Process(msg Message) error

Process processes the stream Message.

func (*MergeProcessor) WithPipe

func (p *MergeProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Message

type Message struct {
	Ctx   context.Context
	Key   interface{}
	Value interface{}
	// contains filtered or unexported fields
}

Message represents data the flows through the stream.

func NewMessage

func NewMessage(k, v interface{}) Message

NewMessage creates a Message.

func NewMessageWithContext

func NewMessageWithContext(ctx context.Context, k, v interface{}) Message

NewMessageWithContext creates a Message with the given context.

func (Message) Empty

func (m Message) Empty() bool

Empty determines if the Message is empty.

func (Message) Metadata

func (m Message) Metadata() (Source, Metadata)

Metadata returns the Message Metadata.

func (Message) WithMetadata

func (m Message) WithMetadata(s Source, v Metadata) Message

WithMetadata add metadata to the Message for a Source.

type Metadata

type Metadata interface {
	// WithOrigin sets the MetadataOrigin on the metadata.
	WithOrigin(MetadataOrigin)
	// Merge merges the contained metadata into the given the metadata with the given strategy.
	Merge(Metadata, MetadataStrategy) Metadata
}

Metadata represents metadata that can be merged.

type MetadataOrigin

type MetadataOrigin uint8

MetadataOrigin represents the metadata origin type.

const (
	CommitterOrigin MetadataOrigin = iota
	ProcessorOrigin
)

MetadataOrigin types.

type MetadataStrategy

type MetadataStrategy uint8

MetadataStrategy represents the metadata merge strategy.

const (
	Lossless MetadataStrategy = iota
	Dupless
)

MetadataStrategy types.

type Metaitem

type Metaitem struct {
	Source   Source
	Metadata Metadata
}

Metaitem represents the source metadata combination.

type Metaitems

type Metaitems []*Metaitem

Metaitems represents a slice of Metaitem pointers.

func (Metaitems) Merge

func (m Metaitems) Merge(items Metaitems, strategy MetadataStrategy) Metaitems

Merge combines contents of two Metaitems objects, merging the Metadata where necessary.

type Metastore

type Metastore interface {
	// Pull gets and clears the processors metadata.
	Pull(Processor) (Metaitems, error)
	// PullAll gets and clears all metadata.
	PullAll() (map[Processor]Metaitems, error)
	// Mark sets metadata for a processor.
	Mark(Processor, Source, Metadata) error
}

Metastore represents a metadata store.

Metastore is only partially concurrency safe. Mark and PullAll can be used concurrently, but Mark and Pull cannot. This is done to avoid locks and improve performance.

func NewMetastore

func NewMetastore() Metastore

NewMetastore creates a new Metastore instance.

type Monitor

type Monitor interface {
	// Processed adds a processed event to the Monitor.
	Processed(name string, l time.Duration, bp float64)
	// Committed adds a committed event to the Monitor.
	Committed(l time.Duration)
	// Close closes the monitor.
	Close() error
}

Monitor represents a stream event collector.

func NewMonitor

func NewMonitor(stats Stats, interval time.Duration) Monitor

NewMonitor creates a new Monitor.

type Node

type Node interface {
	// Name gets the node name.
	Name() string
	// AddChild adds a child node to the node.
	AddChild(n Node)
	// Children gets the nodes children.
	Children() []Node
	// Processor gets the nodes processor.
	Processor() Processor
}

Node represents a topology node.

type Pipe

type Pipe interface {
	// Mark indicates that the message has been delt with
	Mark(Message) error
	// Forward queues the message with all processor children in the topology.
	Forward(Message) error
	// Forward queues the message with the the given processor(inner) child in the topology.
	ForwardToChild(Message, int) error
	// Commit commits the current state in the related sources.
	Commit(Message) error
}

Pipe allows messages to flow through the processors.

func NewPipe

func NewPipe(store Metastore, supervisor Supervisor, proc Processor, children []Pump) Pipe

NewPipe create a new processorPipe instance.

type Predicate

type Predicate interface {
	// Assert tests if the given message satisfies the predicate.
	Assert(Message) (bool, error)
}

Predicate represents a predicate (boolean-valued function) of a message.

type PredicateFunc

type PredicateFunc func(Message) (bool, error)

PredicateFunc represents a function implementing the Predicate interface.

func (PredicateFunc) Assert

func (fn PredicateFunc) Assert(msg Message) (bool, error)

Assert tests if the given message satisfies the predicate.

type PrintProcessor

type PrintProcessor struct {
	// contains filtered or unexported fields
}

PrintProcessor is a processor that prints the stream to stdout.

func (*PrintProcessor) Close

func (p *PrintProcessor) Close() error

Close closes the processor.

func (*PrintProcessor) Process

func (p *PrintProcessor) Process(msg Message) error

Process processes the stream Message.

func (*PrintProcessor) WithPipe

func (p *PrintProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Processor

type Processor interface {
	// WithPipe sets the pipe on the Processor.
	WithPipe(Pipe)
	// Process processes the stream Message.
	Process(Message) error
	// Close closes the processor.
	Close() error
}

Processor represents a stream processor.

func NewBranchProcessor

func NewBranchProcessor(preds []Predicate) Processor

NewBranchProcessor creates a new BranchProcessor instance.

func NewFanOutProcessor added in v6.2.0

func NewFanOutProcessor(streams int) Processor

NewFanOutProcessor creates a new FanOutProcessor instance.

func NewFilterProcessor

func NewFilterProcessor(pred Predicate) Processor

NewFilterProcessor creates a new FilterProcessor instance.

func NewFlatMapProcessor

func NewFlatMapProcessor(mapper FlatMapper) Processor

NewFlatMapProcessor creates a new FlatMapProcessor instance.

func NewMapProcessor

func NewMapProcessor(mapper Mapper) Processor

NewMapProcessor creates a new MapProcessor instance.

func NewMergeProcessor

func NewMergeProcessor() Processor

NewMergeProcessor creates a new MergeProcessor instance.

func NewPrintProcessor

func NewPrintProcessor() Processor

NewPrintProcessor creates a new PrintProcessor instance.

type ProcessorNode

type ProcessorNode struct {
	// contains filtered or unexported fields
}

ProcessorNode represents the topology node for a processor.

func NewProcessorNode

func NewProcessorNode(name string, p Processor) *ProcessorNode

NewProcessorNode creates a new ProcessorNode.

func (*ProcessorNode) AddChild

func (n *ProcessorNode) AddChild(node Node)

AddChild adds a child node to the node.

func (*ProcessorNode) Children

func (n *ProcessorNode) Children() []Node

Children gets the nodes children.

func (*ProcessorNode) Name

func (n *ProcessorNode) Name() string

Name gets the node name.

func (*ProcessorNode) Processor

func (n *ProcessorNode) Processor() Processor

Processor gets the nodes processor.

type Pump

type Pump interface {
	sync.Locker

	// Accept takes a message to be processed in the Pump.
	Accept(Message) error
	// Stop stops the pump.
	Stop()
	// Close closes the pump.
	Close() error
}

Pump represent a Message pump.

func NewAsyncPump

func NewAsyncPump(mon Monitor, node Node, pipe TimedPipe, errFn ErrorFunc) Pump

NewAsyncPump creates a new asynchronous Pump instance.

func NewSyncPump

func NewSyncPump(mon Monitor, node Node, pipe TimedPipe) Pump

NewSyncPump creates a new synchronous Pump instance.

type Source

type Source interface {
	// Consume gets the next Message from the Source.
	Consume() (Message, error)
	// Commit marks the consumed Message as processed.
	Commit(interface{}) error
	// Close closes the Source.
	Close() error
}

Source represents a stream source.

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

SourceNode represents a node between the source and the rest of the node tree.

func NewSourceNode

func NewSourceNode(name string) *SourceNode

NewSourceNode create a new SourceNode.

func (*SourceNode) AddChild

func (n *SourceNode) AddChild(node Node)

AddChild adds a child node to the node.

func (*SourceNode) Children

func (n *SourceNode) Children() []Node

Children gets the nodes children.

func (*SourceNode) Name

func (n *SourceNode) Name() string

Name gets the node name.

func (*SourceNode) Processor

func (n *SourceNode) Processor() Processor

Processor gets the nodes processor.

type SourcePump

type SourcePump interface {
	// Stop stops the source pump from running.
	Stop()
	// Close closed the source pump.
	Close() error
}

SourcePump represents a Message pump for sources.

func NewSourcePump

func NewSourcePump(mon Monitor, name string, source Source, pumps []Pump, errFn ErrorFunc) SourcePump

NewSourcePump creates a new SourcePump.

type SourcePumps

type SourcePumps []SourcePump

SourcePumps represents a set of source pumps.

func (SourcePumps) StopAll

func (p SourcePumps) StopAll()

StopAll stops all source pumps.

type Stats

type Stats interface {
	// Inc increments a count by the value.
	Inc(name string, value int64, tags ...interface{})
	// Gauge measures the value of a metric.
	Gauge(name string, value float64, tags ...interface{})
	// Timing sends the value of a Duration.
	Timing(name string, value time.Duration, tags ...interface{})
}

Stats represents a stats instance.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a stream of data.

func (*Stream) Branch

func (s *Stream) Branch(name string, preds ...Predicate) []*Stream

Branch branches a stream based on the given predicates.

func (*Stream) BranchFunc

func (s *Stream) BranchFunc(name string, preds ...PredicateFunc) []*Stream

BranchFunc branches a stream based on the given predicates.

func (*Stream) FanOut added in v6.2.0

func (s *Stream) FanOut(name string, number int) []*Stream

FanOut creates multiple streams based on the number of streams necessary. It should be used when the same message is supposed to be processed by multiple sinks.

func (*Stream) Filter

func (s *Stream) Filter(name string, pred Predicate) *Stream

Filter filters the stream using a predicate.

func (*Stream) FilterFunc

func (s *Stream) FilterFunc(name string, pred PredicateFunc) *Stream

FilterFunc filters the stream using a predicate.

func (*Stream) FlatMap

func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream

FlatMap runs a flat mapper on the stream.

func (*Stream) FlatMapFunc

func (s *Stream) FlatMapFunc(name string, mapper FlatMapperFunc) *Stream

FlatMapFunc runs a flat mapper on the stream.

func (*Stream) Map

func (s *Stream) Map(name string, mapper Mapper) *Stream

Map runs a mapper on the stream.

func (*Stream) MapFunc

func (s *Stream) MapFunc(name string, mapper MapperFunc) *Stream

MapFunc runs a mapper on the stream.

func (*Stream) Merge

func (s *Stream) Merge(name string, streams ...*Stream) *Stream

Merge merges one or more streams into this stream.

func (*Stream) Print

func (s *Stream) Print(name string) *Stream

Print prints the data in the stream.

func (*Stream) Process

func (s *Stream) Process(name string, p Processor) *Stream

Process runs a custom processor on the stream.

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

StreamBuilder represents a stream builder.

func NewStreamBuilder

func NewStreamBuilder() *StreamBuilder

NewStreamBuilder creates a new StreamBuilder.

func (*StreamBuilder) Build

func (sb *StreamBuilder) Build() (*Topology, []error)

Build builds the stream Topology.

func (*StreamBuilder) Source

func (sb *StreamBuilder) Source(name string, source Source) *Stream

Source adds a Source to the stream, returning the Stream.

type Supervisor

type Supervisor interface {
	io.Closer

	// WithContext sets the context.
	WithContext(context.Context)

	// WithMonitor sets the Monitor.
	WithMonitor(Monitor)

	// WithPumps sets a map of Pumps.
	WithPumps(map[Node]Pump)

	// Start starts the supervisor.
	//
	// This function should initiate all the background tasks of the Supervisor.
	// It must not be a blocking call.
	Start() error

	// Commit performs a global commit sequence.
	//
	// If triggered by a Pipe, the associated Processor should be passed.
	Commit(Processor) error
}

Supervisor represents a concurrency-safe stream supervisor.

The Supervisor performs a commit in a concurrently-safe manner. There can only ever be 1 ongoing commit at any given time.

func NewSupervisor

func NewSupervisor(store Metastore, strategy MetadataStrategy) Supervisor

NewSupervisor returns a new Supervisor instance.

func NewTimedSupervisor

func NewTimedSupervisor(inner Supervisor, d time.Duration, errFn ErrorFunc) Supervisor

NewTimedSupervisor returns a supervisor that commits automatically.

type Task

type Task interface {
	// Start starts the streams processors.
	Start(ctx context.Context) error
	// OnError sets the error handler.
	OnError(fn ErrorFunc)
	// Close stops and closes the streams processors.
	Close() error
}

Task represents a streams task.

func NewTask

func NewTask(topology *Topology, opts ...TaskOptFunc) Task

NewTask creates a new streams task.

type TaskMode

type TaskMode int8

TaskMode represents the task mode.

const (
	Async TaskMode = iota
	Sync
)

TaskMode types.

type TaskOptFunc

type TaskOptFunc func(t *streamTask)

TaskOptFunc represents a function that sets up the Task.

func WithCommitInterval

func WithCommitInterval(d time.Duration) TaskOptFunc

WithCommitInterval defines an interval of automatic commits.

func WithMetadataStrategy

func WithMetadataStrategy(strategy MetadataStrategy) TaskOptFunc

WithMetadataStrategy defines a strategy of metadata mergers.

func WithMode

func WithMode(m TaskMode) TaskOptFunc

WithMode defines the task mode to run in.

func WithMonitorInterval

func WithMonitorInterval(d time.Duration) TaskOptFunc

WithMonitorInterval defines an interval of stats collection.

Minimum interval is 100ms.

func WithStats

func WithStats(stats Stats) TaskOptFunc

WithStats sets the stats handler.

type Tasks

type Tasks []Task

Tasks represents a slice of tasks. This is a utility type that makes it easier to work with multiple tasks.

func (Tasks) Close

func (tasks Tasks) Close() error

Close stops and closes the streams processors. This function operates on the tasks in the reversed order.

func (Tasks) OnError

func (tasks Tasks) OnError(fn ErrorFunc)

OnError sets the error handler.

func (Tasks) Start

func (tasks Tasks) Start(ctx context.Context) error

Start starts the streams processors.

type TimedPipe

type TimedPipe interface {
	// Reset resets the accumulative pipe duration.
	Reset()
	// Duration returns the accumulative pipe duration.
	Duration() time.Duration
}

TimedPipe represents a pipe that can accumulate execution time.

type Topology

type Topology struct {
	// contains filtered or unexported fields
}

Topology represents the streams topology.

func (Topology) Processors

func (t Topology) Processors() []Node

Processors gets the topology Processors.

func (Topology) Sources

func (t Topology) Sources() map[Source]Node

Sources get the topology Sources.

type TopologyBuilder

type TopologyBuilder struct {
	// contains filtered or unexported fields
}

TopologyBuilder represents a topology builder.

func NewTopologyBuilder

func NewTopologyBuilder() *TopologyBuilder

NewTopologyBuilder creates a new TopologyBuilder.

func (*TopologyBuilder) AddProcessor

func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node

AddProcessor adds a Processor to the builder, returning the created Node.

func (*TopologyBuilder) AddSource

func (tb *TopologyBuilder) AddSource(name string, source Source) Node

AddSource adds a Source to the builder, returning the created Node.

func (*TopologyBuilder) Build

func (tb *TopologyBuilder) Build() (*Topology, []error)

Build creates an immutable Topology.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL