Documentation ¶
Index ¶
- func New() (Readable, Writable)
- func NewCollection(size int) (ReadableCollection, WritableCollection)
- func NewGrouped() (GroupedReadable, GroupedWritable)
- type Bus
- type FilterFunc
- type FlatMapFunc
- type GroupByFunc
- type GroupedReadable
- type GroupedT
- type GroupedWritable
- type MapFunc
- type PartitionKey
- func (pk *PartitionKey) Add(key string, value interface{})
- func (pk *PartitionKey) FormatKey() string
- func (pk *PartitionKey) Get(key string) (interface{}, bool)
- func (pk *PartitionKey) GetKeys() []string
- func (pk *PartitionKey) GetValues() []interface{}
- func (pk *PartitionKey) Update(key string, value interface{})
- type Publisher
- type Readable
- func Combine(streams ReadableCollection) Readable
- func Filter(in Readable, fn FilterFunc) Readable
- func FlatMap(in Readable, fn FlatMapFunc) Readable
- func Map(in Readable, fn MapFunc) Readable
- func NewFrom(slice ...interface{}) Readable
- func NewFromRange(lowerBound, upperBound int) Readable
- func NewFromValue(val interface{}) Readable
- func Reduce(in Readable, fn ReduceFunc, accumulator T) Readable
- func Transform(in Readable, fn TransformFunc) Readable
- func (r Readable) Drain()
- func (r Readable) Filter(fn FilterFunc) Readable
- func (r Readable) FlatMap(fn FlatMapFunc) Readable
- func (r Readable) GroupBy(fn GroupByFunc) GroupedReadable
- func (r Readable) Map(fn MapFunc) Readable
- func (r Readable) Reduce(fn ReduceFunc, accumulator T) Readable
- func (r Readable) Split(streams int) ReadableCollection
- func (r Readable) Transform(fn TransformFunc) Readable
- type ReadableCollection
- type ReduceFunc
- type StreamPersister
- type SubscribeFunc
- type Subscriber
- type T
- type TransformFunc
- type Writable
- type WritableCollection
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewCollection ¶
func NewCollection(size int) (ReadableCollection, WritableCollection)
func NewGrouped ¶
func NewGrouped() (GroupedReadable, GroupedWritable)
Types ¶
type Bus ¶
type Bus interface { Subscriber Publisher Start() <-chan bool }
type FilterFunc ¶
type FlatMapFunc ¶
type FlatMapFunc func(msg T) []interface{}
type GroupByFunc ¶
type GroupedReadable ¶
type GroupedReadable <-chan *GroupedT
func GroupBy ¶
func GroupBy(in Readable, fn GroupByFunc) GroupedReadable
func (GroupedReadable) Reduce ¶
func (gr GroupedReadable) Reduce(fn ReduceFunc, accumulator T) Readable
type GroupedT ¶
type GroupedT struct { PartitionKey *PartitionKey Stream Readable // contains filtered or unexported fields }
type GroupedWritable ¶
type GroupedWritable chan<- *GroupedT
func (GroupedWritable) Close ¶
func (w GroupedWritable) Close()
type PartitionKey ¶
type PartitionKey struct {
// contains filtered or unexported fields
}
func NewPartitionKey ¶
func NewPartitionKey() *PartitionKey
func (*PartitionKey) Add ¶
func (pk *PartitionKey) Add(key string, value interface{})
func (*PartitionKey) FormatKey ¶
func (pk *PartitionKey) FormatKey() string
func (*PartitionKey) Get ¶
func (pk *PartitionKey) Get(key string) (interface{}, bool)
func (*PartitionKey) GetKeys ¶
func (pk *PartitionKey) GetKeys() []string
func (*PartitionKey) GetValues ¶
func (pk *PartitionKey) GetValues() []interface{}
func (*PartitionKey) Update ¶
func (pk *PartitionKey) Update(key string, value interface{})
type Readable ¶
type Readable <-chan T
func Combine ¶
func Combine(streams ReadableCollection) Readable
func Filter ¶
func Filter(in Readable, fn FilterFunc) Readable
func FlatMap ¶
func FlatMap(in Readable, fn FlatMapFunc) Readable
func NewFromRange ¶
func NewFromValue ¶
func NewFromValue(val interface{}) Readable
func Transform ¶
func Transform(in Readable, fn TransformFunc) Readable
func (Readable) Filter ¶
func (r Readable) Filter(fn FilterFunc) Readable
func (Readable) FlatMap ¶
func (r Readable) FlatMap(fn FlatMapFunc) Readable
func (Readable) GroupBy ¶
func (r Readable) GroupBy(fn GroupByFunc) GroupedReadable
func (Readable) Split ¶
func (r Readable) Split(streams int) ReadableCollection
func (Readable) Transform ¶
func (r Readable) Transform(fn TransformFunc) Readable
type ReadableCollection ¶
type ReadableCollection []Readable
func Split ¶
func Split(streamCount int, stream Readable) ReadableCollection
func (ReadableCollection) Combine ¶
func (rc ReadableCollection) Combine() Readable
type ReduceFunc ¶
type StreamPersister ¶
type StreamPersister interface { Register(name string, objTemplate interface{}) error Persist(name string, stream Readable) error }
func NewNoOpStreamPersister ¶
func NewNoOpStreamPersister() StreamPersister
type SubscribeFunc ¶
type Subscriber ¶
type Subscriber interface {
Subscribe(topics []string, fn SubscribeFunc) error
}
type TransformFunc ¶
type WritableCollection ¶
type WritableCollection []Writable
func (WritableCollection) Close ¶
func (wc WritableCollection) Close()
Click to show internal directories.
Click to hide internal directories.