Documentation ¶
Index ¶
- Variables
- func Identity(e interface{}) interface{}
- func IdentityMapper(e interface{}) interface{}
- func ValueIterator(i ProcessingIterator) data.Iterator
- type AggregationFunction
- type BufferCreator
- type BufferFrame
- type BufferImplementation
- type CheckNext
- type CompareFunction
- type CompareIndexedFunction
- type ExplodeFunction
- type FilterFunction
- type IncrementalProcessingSource
- type Index
- type IndexArray
- type MappingFunction
- type ProcessChain
- func Append(chain, add ProcessChain, conditions ...options.Condition) ProcessChain
- func Chain(log logging.Context) ProcessChain
- func Explode(e ExplodeFunction) ProcessChain
- func Filter(f FilterFunction) ProcessChain
- func Map(m MappingFunction) ProcessChain
- func Parallel(n int) ProcessChain
- func Sort(c CompareFunction) ProcessChain
- func Transform(t TransformFunction) ProcessChain
- func Unordered() ProcessChain
- func WithPool(pool ProcessorPool) ProcessChain
- type ProcessingBuffer
- type ProcessingEntry
- type ProcessingIterable
- type ProcessingIterator
- type ProcessingResult
- type ProcessingSource
- type ProcessorPool
- type SubEntries
- type TransformFunction
Constants ¶
This section is empty.
Variables ¶
View Source
var REALM = ocmlog.DefineSubRealm("output processing chains", "processing")
Functions ¶
func IdentityMapper ¶
func IdentityMapper(e interface{}) interface{}
func ValueIterator ¶
func ValueIterator(i ProcessingIterator) data.Iterator
Types ¶
type AggregationFunction ¶
type AggregationFunction func(e, aggr interface{}) interface{}
type BufferCreator ¶
type BufferCreator func(log logging.Context) ProcessingBuffer
type BufferFrame ¶
type BufferFrame interface { Lock() Unlock() Broadcast() Wait() IsClosed() bool }
type BufferImplementation ¶
type BufferImplementation interface { Add(e ProcessingEntry) bool Open() Close() Len() int Get(i int) interface{} ProcessingIterable SetFrame(frame BufferFrame) }
type CompareFunction ¶
type CompareFunction = data.CompareFunction
type CompareIndexedFunction ¶
type CompareIndexedFunction = data.CompareIndexedFunction
type ExplodeFunction ¶
type ExplodeFunction func(interface{}) []interface{}
type FilterFunction ¶
type FilterFunction func(interface{}) bool
type IncrementalProcessingSource ¶
type IncrementalProcessingSource interface { data.Iterable Open() Add(e ...interface{}) IncrementalProcessingSource Close() }
type IndexArray ¶
type IndexArray []int
func (IndexArray) After ¶
func (i IndexArray) After(o IndexArray) bool
func (IndexArray) Copy ¶
func (i IndexArray) Copy() IndexArray
func (IndexArray) Next ¶
func (i IndexArray) Next(maxIndex, sub int) IndexArray
func (IndexArray) Validate ¶
func (i IndexArray) Validate(maxIndex int)
type MappingFunction ¶
type MappingFunction data.MappingFunction
func MappingSequence ¶ added in v0.16.0
func MappingSequence(mapper ...MappingFunction) MappingFunction
type ProcessChain ¶
type ProcessChain interface { Transform(t TransformFunction) ProcessChain Explode(m ExplodeFunction) ProcessChain Map(m MappingFunction) ProcessChain Filter(f FilterFunction) ProcessChain Sort(c CompareFunction) ProcessChain WithPool(p ProcessorPool) ProcessChain Unordered() ProcessChain Parallel(n int) ProcessChain Append(p ProcessChain) ProcessChain Process(data data.Iterable) ProcessingResult }
ProcessChain is a data structure holding a chain definition, which is a chain of step creation functions used to instantiate the chain for a dedicated input processing. The instantiation is initiated by calling the Process method on a chain.
func Append ¶
func Append(chain, add ProcessChain, conditions ...options.Condition) ProcessChain
func Chain ¶
func Chain(log logging.Context) ProcessChain
func Explode ¶
func Explode(e ExplodeFunction) ProcessChain
func Filter ¶
func Filter(f FilterFunction) ProcessChain
func Map ¶
func Map(m MappingFunction) ProcessChain
func Parallel ¶
func Parallel(n int) ProcessChain
func Sort ¶
func Sort(c CompareFunction) ProcessChain
func Transform ¶
func Transform(t TransformFunction) ProcessChain
func Unordered ¶
func Unordered() ProcessChain
func WithPool ¶
func WithPool(pool ProcessorPool) ProcessChain
type ProcessingBuffer ¶
type ProcessingBuffer interface { Add(e ProcessingEntry) ProcessingBuffer Len() int Get(int) interface{} Open() Close() ProcessingIterable IsClosed() bool }
func NewOrderedBuffer ¶
func NewOrderedBuffer(log logging.Context) ProcessingBuffer
func NewProcessingBuffer ¶
func NewProcessingBuffer(log logging.Context, i BufferImplementation) ProcessingBuffer
func NewSimpleBuffer ¶
func NewSimpleBuffer(log logging.Context) ProcessingBuffer
type ProcessingEntry ¶
func NewEntry ¶
func NewEntry(i Index, v interface{}, opts ...interface{}) ProcessingEntry
type ProcessingIterable ¶
type ProcessingIterable interface { ProcessingIterator() ProcessingIterator Iterator() data.Iterator }
func NewEntryIterableFromIterable ¶
func NewEntryIterableFromIterable(log logging.Context, data data.Iterable) ProcessingIterable
func ValueIterable ¶
func ValueIterable(i ProcessingIterable) ProcessingIterable
type ProcessingIterator ¶
type ProcessingIterator interface { HasNext() bool NextProcessingEntry() ProcessingEntry }
type ProcessingResult ¶
type ProcessingResult interface { data.Iterable Transform(t TransformFunction) ProcessingResult Explode(e ExplodeFunction) ProcessingResult Map(m MappingFunction) ProcessingResult Filter(f FilterFunction) ProcessingResult Sort(c CompareFunction) ProcessingResult Apply(c ProcessChain) ProcessingResult Synchronously() ProcessingResult Asynchronously() ProcessingResult WithPool(ProcessorPool) ProcessingResult Unordered() ProcessingResult Parallel(n int) ProcessingResult AsSlice() data.IndexedSliceAccess }
type ProcessingSource ¶
type ProcessingSource interface { IncrementalProcessingSource }
func NewAsyncProcessingSource ¶
func NewAsyncProcessingSource(log logging.Context, f func() data.Iterable, pool ProcessorPool) ProcessingSource
func NewIncrementalProcessingSource ¶
func NewIncrementalProcessingSource(log logging.Context) ProcessingSource
type ProcessorPool ¶
type ProcessorPool interface { Request() Release() Exec(func()) }
func NewProcessorPool ¶
func NewProcessorPool(n int) ProcessorPool
func NewUnlimitedProcessorPool ¶
func NewUnlimitedProcessorPool() ProcessorPool
type SubEntries ¶
type SubEntries int
Click to show internal directories.
Click to hide internal directories.