streams

package
v0.0.0-...-4f0d0f2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2021 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New() (Readable, Writable)

func NewCollection

func NewCollection(size int) (ReadableCollection, WritableCollection)

func NewGrouped

func NewGrouped() (GroupedReadable, GroupedWritable)

Types

type Bus

type Bus interface {
	Subscriber
	Publisher
	Start() <-chan bool
}

type FilterFunc

type FilterFunc func(msg T) bool

type FlatMapFunc

type FlatMapFunc func(msg T) []interface{}

type GroupByFunc

type GroupByFunc func(msg T) ([]string, []interface{})

type GroupedReadable

type GroupedReadable <-chan *GroupedT

func GroupBy

func GroupBy(in Readable, fn GroupByFunc) GroupedReadable

func (GroupedReadable) Reduce

func (gr GroupedReadable) Reduce(fn ReduceFunc, accumulator T) Readable

type GroupedT

type GroupedT struct {
	PartitionKey *PartitionKey
	Stream       Readable
	// contains filtered or unexported fields
}

type GroupedWritable

type GroupedWritable chan<- *GroupedT

func (GroupedWritable) Close

func (w GroupedWritable) Close()

type MapFunc

type MapFunc func(msg T) T

type PartitionKey

type PartitionKey struct {
	// contains filtered or unexported fields
}

func NewPartitionKey

func NewPartitionKey() *PartitionKey

func (*PartitionKey) Add

func (pk *PartitionKey) Add(key string, value interface{})

func (*PartitionKey) FormatKey

func (pk *PartitionKey) FormatKey() string

func (*PartitionKey) Get

func (pk *PartitionKey) Get(key string) (interface{}, bool)

func (*PartitionKey) GetKeys

func (pk *PartitionKey) GetKeys() []string

func (*PartitionKey) GetValues

func (pk *PartitionKey) GetValues() []interface{}

func (*PartitionKey) Update

func (pk *PartitionKey) Update(key string, value interface{})

type Publisher

type Publisher interface {
	Publish(topic string, stream Readable) error
}

type Readable

type Readable <-chan T

func Combine

func Combine(streams ReadableCollection) Readable

func Filter

func Filter(in Readable, fn FilterFunc) Readable

func FlatMap

func FlatMap(in Readable, fn FlatMapFunc) Readable

func Map

func Map(in Readable, fn MapFunc) Readable

func NewFrom

func NewFrom(slice ...interface{}) Readable

func NewFromRange

func NewFromRange(lowerBound, upperBound int) Readable

func NewFromValue

func NewFromValue(val interface{}) Readable

func Reduce

func Reduce(in Readable, fn ReduceFunc, accumulator T) Readable

func Transform

func Transform(in Readable, fn TransformFunc) Readable

func (Readable) Drain

func (r Readable) Drain()

func (Readable) Filter

func (r Readable) Filter(fn FilterFunc) Readable

func (Readable) FlatMap

func (r Readable) FlatMap(fn FlatMapFunc) Readable

func (Readable) GroupBy

func (r Readable) GroupBy(fn GroupByFunc) GroupedReadable

func (Readable) Map

func (r Readable) Map(fn MapFunc) Readable

func (Readable) Reduce

func (r Readable) Reduce(fn ReduceFunc, accumulator T) Readable

func (Readable) Split

func (r Readable) Split(streams int) ReadableCollection

func (Readable) Transform

func (r Readable) Transform(fn TransformFunc) Readable

type ReadableCollection

type ReadableCollection []Readable

func Split

func Split(streamCount int, stream Readable) ReadableCollection

func (ReadableCollection) Combine

func (rc ReadableCollection) Combine() Readable

type ReduceFunc

type ReduceFunc func(accumulator T, msg T) T

type StreamPersister

type StreamPersister interface {
	Register(name string, objTemplate interface{}) error
	Persist(name string, stream Readable) error
}

func NewNoOpStreamPersister

func NewNoOpStreamPersister() StreamPersister

type SubscribeFunc

type SubscribeFunc func(p Publisher, stream Readable)

type Subscriber

type Subscriber interface {
	Subscribe(topics []string, fn SubscribeFunc) error
}

type T

type T interface{}

type TransformFunc

type TransformFunc func(msg T, out Writable)

type Writable

type Writable chan<- T

func (Writable) Close

func (w Writable) Close()

type WritableCollection

type WritableCollection []Writable

func (WritableCollection) Close

func (wc WritableCollection) Close()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL