Documentation ¶
Index ¶
- Variables
- type ByteEncoder
- type Closer
- type Config
- func (c Config) Array() (value []Config)
- func (c Config) Bool(def bool) (value bool)
- func (c Config) Duration(def time.Duration) (value time.Duration)
- func (c Config) Float64(def float64) (value float64)
- func (c Config) Get(path ...string) (config Config)
- func (c Config) Int(def int) (value int)
- func (c Config) Int64(def int64) (value int64)
- func (c Config) IsSet(path ...string) (ok bool)
- func (c Config) Map() (value map[string]Config)
- func (c Config) Set(value interface{}, path ...string)
- func (c Config) String(def string) (value string)
- func (c Config) Time(def time.Time) (value time.Time)
- func (c Config) Uint(def uint) (value uint)
- func (c Config) Uint64(def uint64) (value uint64)
- type Encoder
- type Error
- type Initializer
- type Node
- type Processor
- type ProcessorContext
- type ProcessorFunc
- type ProcessorSupplier
- type ROStore
- type Record
- type Remover
- type Source
- type SourceSupplier
- type Store
- type StoreSupplier
- type Stream
- type StringEncoder
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidForward is returned when forwarding from a non active context // i.e. outisde a processor.Process() call, forwarding from a processor with // no sucessors or from a sink processor. A source processor context is // always active. ErrInvalidForward = errors.New("invalid forward") // ErrStoreNotFound is returned when the Store requested store // doesn't exists in the Stream topology. ErrStoreNotFound = errors.New("store not found") )
var ( // ErrKeyNotFound is returned when a key is not found on a get from the store. ErrKeyNotFound = errors.New("key not found") )
Functions ¶
This section is empty.
Types ¶
type ByteEncoder ¶
type ByteEncoder []byte
ByteEncoder implements the Encoder interface for byte slices so that they can be used as the Key or Value in a Record.
func (ByteEncoder) Encode ¶
func (b ByteEncoder) Encode() ([]byte, error)
Encode serializes the encoder data as []byte
type Closer ¶
type Closer interface {
Close() (err error)
}
Closer interface. Any Processor or Store that must be closed on Stream termination must implement this interface.
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config is a configuration object safe for concurrent gets but not for sets. Configuration items are specified by a path using a dot separated names for both setting and getting configuration values. Valid paths: a a.nest.key a.nest.key.array.# for set to append to an array a.nest.key.array.#.key for set to append to an array an nested element a.nest.key.array.2 for set or get the 3rd element from an array a.nest.key.array.2.key for set or get the 3rd element from an array an nested element
func NewConfig ¶
NewConfig creates a config from a exiting map[string]interface{} or an empty Config if nil is provided
func (Config) Array ¶
Array returns the config array for the given path, returns nil if path is not found or value is not an array.
func (Config) Bool ¶
Bool returns the bool value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a bool value.
func (Config) Duration ¶
Duration returns the time.Duration value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a time.Duration value.
func (Config) Float64 ¶
Float64 returns the float64 value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a float64 value.
func (Config) Get ¶
Get retrieves the config item for the given path. Path can be specified as dot separated key structure or a varidic list of keys representing the path within config.
func (Config) Int ¶
Int returns the int64 value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a int64 value.
func (Config) Int64 ¶
Int64 returns the int64 value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a int64 value.
func (Config) IsSet ¶
IsSet returns true if path is set. Path can be a dot separated keys or a varidic list of keys representing the path within config.
func (Config) Map ¶
Map returns the config map for the given path, returns nil if path is not found or value is not an object.
func (Config) String ¶
String returns the string value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a string value.
func (Config) Time ¶
Time returns the time.Time value for the current Config item or a provided default. The default value is only returned if the current Config item is nil or if it fails to parse the current item to a time.Time value.
type Encoder ¶
Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Record.
type Initializer ¶
type Initializer interface {
Init(pc ProcessorContext) (err error)
}
Initializer interface. Any Processor or Store that must be initialized before running tasks in the the stream must implement this interface.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node of a topology. Can be a source, sink, or processor node.
func (*Node) Predecessors ¶
Predecessors of node
type Processor ¶
type Processor interface {
Process(pc ProcessorContext, record Record)
}
Processor of records in a Stream. Both processors and sinks must implement this interface.
type ProcessorContext ¶
type ProcessorContext interface { // NodeName returns the current node name. NodeName() (name string) // StreamName returns the stream name. StreamName() (name string) // Config returns the stream app configuration. Config() (config Config) // IsActive returns if this context is active and can forward records to the stream. IsActive() (active bool) // Store returns the store with the given name Store(name string) (store Store, err error) // Forward the record to the downstream processors. Can be called multiple times // within Processor.Process() in order to send correlated or windowed records. Forward(record Record) (err error) // ForwardTo is like forward, but it forwards the record only to the given node ForwardTo(to string, record Record) (err error) // Error emits a error event to be handled by the Stream. Error(err error, records ...Record) }
ProcessorContext is a execution context within a stream. Provides stream, task and processor information, routing of records to children processors, access to configured stores and contextual logging.
type ProcessorFunc ¶
type ProcessorFunc func(pc ProcessorContext, record Record)
ProcessorFunc implements a Processor for a function type
func (ProcessorFunc) Process ¶
func (f ProcessorFunc) Process(pc ProcessorContext, record Record)
Process the given record
type ProcessorSupplier ¶
type ProcessorSupplier func() Processor
ProcessorSupplier instantiates Processors used to create a Stream topology, recreate them or clone a Stream. If further configuration is needed, the processor must implement the Initializer interface in order to initialize itself before the Stream start and access configuration parameters through the provided context.
type ROStore ¶
type ROStore interface { // Name returns this store name. Name() (name string) // Get value for the given key. Get(key []byte) (value []byte, err error) // Range iterates the store in byte-wise lexicographical sorting order // within the given key range applying the callback for the key value pairs. // Returning a error causes the iteration to stop. // A nil from or to sets the iterator to the begining or end of Store. // Setting both from and to as nil iterates the whole store. // The key and value must be treated as immutable and read-only. // Key and value bytes remain available only during the callback call and // must be copied if outside use is needed, Range(from, to []byte, callback func(key, value []byte) error) (err error) // RangePrefix iterates the store over a key prefix applying the callback // for the key value pairs. Returning a error causes the iteration to stop. // The key and value must be treated as immutable and read-only. // Key and value bytes remain available only during the callback call and // must be copied if outside use is needed, RangePrefix(prefix []byte, cb func(key, value []byte) error) (err error) }
ROStore is a read only key/value store
type Record ¶
type Record struct { Topic string // Topic to wich this Record is associated Key Encoder // Record Key Value Encoder // Record Value Time time.Time // Record time // contains filtered or unexported fields }
Record represents a single record within a stream
type Remover ¶
type Remover interface {
Remove() (err error)
}
Remover interface. Any Store that must clear its data or state must implement this interface. Remove must ensure releasing and closing of resources.
type Source ¶
type Source interface { Processor Consume(pc ProcessorContext) }
Source is a source of records in a Stream.
type SourceSupplier ¶
type SourceSupplier func() Store
SourceSupplier instantiates Sources used to create a Stream topology, recreate them or clone a Stream. If further configuration is needed, the source must implement the Initializer interface in order to initialize itself before the Stream start and access configuration parameters through the provided context.
type Store ¶
type Store interface { Processor ROStore // Set the value for the given key. // If TTL is greater than 0 it will set an expiry time for the key. Set(key, value []byte) (err error) // Delete the given key and associated value Delete(key []byte) (err error) }
Store is a read write key/value store.
type StoreSupplier ¶
type StoreSupplier func() Store
StoreSupplier instantiates Stores used to create a Stream topology, recreate them or clone a Stream. If further configuration is needed, the store must implement the Initializer interface in order to initialize itself before the Stream start and access configuration parameters through the provided context.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents an unbounded, continuously updating data set. It contains a topology defining the data processing to be done. A Stream can have multiple concurrent tasks over the same processor topology. Stream Sources, Processors and Sinks must be safe for concurrent use.
func (*Stream) Close ¶
Close the stream. Closes all stream sources and its tasks in parallel, close all processors sequentially if their context is deactivated, close all sink processors and finally all stores.
type StringEncoder ¶
type StringEncoder string
StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a Record.
func (StringEncoder) Encode ¶
func (s StringEncoder) Encode() ([]byte, error)
Encode serializes the encoder data as []byte