streams

package module
v0.0.0-...-6803a88 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2019 License: Apache-2.0 Imports: 12 Imported by: 0

README

Streams is an efficient stream processing library written in Go

It is inspired by kafka streams, but intends to be source and sink agnostic.

A Stream is a DAG of Sources, Processors, Sinks and Stores.

Under heavy development.

Contact

Bruno Moura brunotm@gmail.com

License

Streams source code is available under the Apache Version 2.0 License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidForward is returned when forwarding from a non active context
	// i.e. outisde a processor.Process() call, forwarding from a processor with
	// no sucessors or from a sink processor. A source processor context is
	// always active.
	ErrInvalidForward = errors.New("invalid forward")

	// ErrStoreNotFound is returned when the Store requested store
	// doesn't exists in the Stream topology.
	ErrStoreNotFound = errors.New("store not found")
)
View Source
var (
	// ErrKeyNotFound is returned when a key is not found on a get from the store.
	ErrKeyNotFound = errors.New("key not found")
)

Functions

This section is empty.

Types

type ByteEncoder

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for byte slices so that they can be used as the Key or Value in a Record.

func (ByteEncoder) Encode

func (b ByteEncoder) Encode() ([]byte, error)

Encode serializes the encoder data as []byte

type Closer

type Closer interface {
	Close() (err error)
}

Closer interface. Any Processor or Store that must be closed on Stream termination must implement this interface.

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config is a configuration object safe for concurrent gets but not for sets. Configuration items are specified by a path using a dot separated names for both setting and getting configuration values. Valid paths: a a.nest.key a.nest.key.array.# for set to append to an array a.nest.key.array.#.key for set to append to an array an nested element a.nest.key.array.2 for set or get the 3rd element from an array a.nest.key.array.2.key for set or get the 3rd element from an array an nested element

func NewConfig

func NewConfig(data map[string]interface{}) (c Config)

NewConfig creates a config from a exiting map[string]interface{} or an empty Config if nil is provided

func (Config) Array

func (c Config) Array() (value []Config)

Array returns the config array for the given path, returns nil if path is not found or value is not an array.

func (Config) Bool

func (c Config) Bool(def bool) (value bool)

Bool returns the bool value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a bool value.

func (Config) Duration

func (c Config) Duration(def time.Duration) (value time.Duration)

Duration returns the time.Duration value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a time.Duration value.

func (Config) Float64

func (c Config) Float64(def float64) (value float64)

Float64 returns the float64 value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a float64 value.

func (Config) Get

func (c Config) Get(path ...string) (config Config)

Get retrieves the config item for the given path. Path can be specified as dot separated key structure or a varidic list of keys representing the path within config.

func (Config) Int

func (c Config) Int(def int) (value int)

Int returns the int64 value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a int64 value.

func (Config) Int64

func (c Config) Int64(def int64) (value int64)

Int64 returns the int64 value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a int64 value.

func (Config) IsSet

func (c Config) IsSet(path ...string) (ok bool)

IsSet returns true if path is set. Path can be a dot separated keys or a varidic list of keys representing the path within config.

func (Config) Map

func (c Config) Map() (value map[string]Config)

Map returns the config map for the given path, returns nil if path is not found or value is not an object.

func (Config) Set

func (c Config) Set(value interface{}, path ...string)

Set the value for the given path

func (Config) String

func (c Config) String(def string) (value string)

String returns the string value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a string value.

func (Config) Time

func (c Config) Time(def time.Time) (value time.Time)

Time returns the time.Time value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a time.Time value.

func (Config) Uint

func (c Config) Uint(def uint) (value uint)

Uint returns the uint64 value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a uint64 value.

func (Config) Uint64

func (c Config) Uint64(def uint64) (value uint64)

Uint64 returns the uint64 value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a uint64 value.

type Encoder

type Encoder interface {
	Encode() ([]byte, error)
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Record.

type Error

type Error struct {
	Node   *Node
	Error  error
	Record []Record
}

Error generated by the stream components

type Initializer

type Initializer interface {
	Init(pc ProcessorContext) (err error)
}

Initializer interface. Any Processor or Store that must be initialized before running tasks in the the stream must implement this interface.

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node of a topology. Can be a source, sink, or processor node.

func (*Node) Name

func (n *Node) Name() (name string)

Name of node

func (*Node) Predecessors

func (n *Node) Predecessors() (predecessors []*Node)

Predecessors of node

func (*Node) Successors

func (n *Node) Successors() (sucessors []*Node)

Successors of node

func (*Node) Type

func (n *Node) Type() (typ types.Type)

Type of node

type Processor

type Processor interface {
	Process(pc ProcessorContext, record Record)
}

Processor of records in a Stream. Both processors and sinks must implement this interface.

type ProcessorContext

type ProcessorContext interface {
	// NodeName returns the current node name.
	NodeName() (name string)
	// StreamName returns the stream name.
	StreamName() (name string)
	// Config returns the stream app configuration.
	Config() (config Config)
	// IsActive returns if this context is active and can forward records to the stream.
	IsActive() (active bool)
	// Store returns the store with the given name
	Store(name string) (store Store, err error)
	// Forward the record to the downstream processors. Can be called multiple times
	// within Processor.Process() in order to send correlated or windowed records.
	Forward(record Record) (err error)
	// ForwardTo is like forward, but it forwards the record only to the given node
	ForwardTo(to string, record Record) (err error)
	// Error emits a error event to be handled by the Stream.
	Error(err error, records ...Record)
}

ProcessorContext is a execution context within a stream. Provides stream, task and processor information, routing of records to children processors, access to configured stores and contextual logging.

type ProcessorFunc

type ProcessorFunc func(pc ProcessorContext, record Record)

ProcessorFunc implements a Processor for a function type

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(pc ProcessorContext, record Record)

Process the given record

type ProcessorSupplier

type ProcessorSupplier func() Processor

ProcessorSupplier instantiates Processors used to create a Stream topology, recreate them or clone a Stream. If further configuration is needed, the processor must implement the Initializer interface in order to initialize itself before the Stream start and access configuration parameters through the provided context.

type ROStore

type ROStore interface {

	// Name returns this store name.
	Name() (name string)

	// Get value for the given key.
	Get(key []byte) (value []byte, err error)

	// Range iterates the store in byte-wise lexicographical sorting order
	// within the given key range applying the callback for the key value pairs.
	// Returning a error causes the iteration to stop.
	// A nil from or to sets the iterator to the begining or end of Store.
	// Setting both from and to as nil iterates the whole store.
	// The key and value must be treated as immutable and read-only.
	// Key and value bytes remain available only during the callback call and
	// must be copied if outside use is needed,
	Range(from, to []byte, callback func(key, value []byte) error) (err error)

	// RangePrefix iterates the store over a key prefix applying the callback
	// for the key value pairs. Returning a error causes the iteration to stop.
	// The key and value must be treated as immutable and read-only.
	// Key and value bytes remain available only during the callback call and
	// must be copied if outside use is needed,
	RangePrefix(prefix []byte, cb func(key, value []byte) error) (err error)
}

ROStore is a read only key/value store

type Record

type Record struct {
	Topic string    // Topic to wich this Record is associated
	Key   Encoder   // Record Key
	Value Encoder   // Record Value
	Time  time.Time // Record time
	// contains filtered or unexported fields
}

Record represents a single record within a stream

func NewRecord

func NewRecord(topic string, key, value Encoder, ts time.Time, ack func() error) (record Record)

NewRecord creates a new record. Key and ack are optional and can be set to nil.

func (Record) Ack

func (r Record) Ack() (err error)

Ack acknowledge the record source of its processing

func (Record) IsValid

func (r Record) IsValid() (valid bool)

IsValid returns if this record contains any data

type Remover

type Remover interface {
	Remove() (err error)
}

Remover interface. Any Store that must clear its data or state must implement this interface. Remove must ensure releasing and closing of resources.

type Source

type Source interface {
	Processor
	Consume(pc ProcessorContext)
}

Source is a source of records in a Stream.

type SourceSupplier

type SourceSupplier func() Store

SourceSupplier instantiates Sources used to create a Stream topology, recreate them or clone a Stream. If further configuration is needed, the source must implement the Initializer interface in order to initialize itself before the Stream start and access configuration parameters through the provided context.

type Store

type Store interface {
	Processor
	ROStore

	// Set the value for the given key.
	// If TTL is greater than 0 it will set an expiry time for the key.
	Set(key, value []byte) (err error)

	// Delete the given key and associated value
	Delete(key []byte) (err error)
}

Store is a read write key/value store.

type StoreSupplier

type StoreSupplier func() Store

StoreSupplier instantiates Stores used to create a Stream topology, recreate them or clone a Stream. If further configuration is needed, the store must implement the Initializer interface in order to initialize itself before the Stream start and access configuration parameters through the provided context.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents an unbounded, continuously updating data set. It contains a topology defining the data processing to be done. A Stream can have multiple concurrent tasks over the same processor topology. Stream Sources, Processors and Sinks must be safe for concurrent use.

func (*Stream) Close

func (s *Stream) Close() (err error)

Close the stream. Closes all stream sources and its tasks in parallel, close all processors sequentially if their context is deactivated, close all sink processors and finally all stores.

func (*Stream) Start

func (s *Stream) Start() (err error)

Start initializes the stores, sources, processors and sinks within the topology and starts the stream.

func (*Stream) Store

func (s *Stream) Store(name string) (store ROStore, err error)

Store returns the store with the given name

type StringEncoder

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a Record.

func (StringEncoder) Encode

func (s StringEncoder) Encode() ([]byte, error)

Encode serializes the encoder data as []byte

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL