processing

package
v0.19.0-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: Apache-2.0 Imports: 8 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var REALM = ocmlog.DefineSubRealm("output processing chains", "processing")

Functions

func Identity

func Identity(e interface{}) interface{}

func IdentityMapper

func IdentityMapper(e interface{}) interface{}

func ValueIterator

func ValueIterator(i ProcessingIterator) data.Iterator

Types

type AggregationFunction

type AggregationFunction func(e, aggr interface{}) interface{}

type BufferCreator

type BufferCreator func(log logging.Context) ProcessingBuffer

type BufferFrame

type BufferFrame interface {
	Lock()
	Unlock()
	Broadcast()
	Wait()

	IsClosed() bool
}

type BufferImplementation

type BufferImplementation interface {
	Add(e ProcessingEntry) bool
	Open()
	Close()
	Len() int
	Get(i int) interface{}

	ProcessingIterable

	SetFrame(frame BufferFrame)
}

type CheckNext

type CheckNext interface {
	CheckNext() bool
}

type CompareFunction

type CompareFunction = data.CompareFunction

type CompareIndexedFunction

type CompareIndexedFunction = data.CompareIndexedFunction

type ExplodeFunction

type ExplodeFunction func(interface{}) []interface{}

type FilterFunction

type FilterFunction func(interface{}) bool

type IncrementalProcessingSource

type IncrementalProcessingSource interface {
	data.Iterable
	Open()
	Add(e ...interface{}) IncrementalProcessingSource
	Close()
}

type Index

type Index = IndexArray

func Top

func Top(i int) Index

type IndexArray

type IndexArray []int

func (IndexArray) After

func (i IndexArray) After(o IndexArray) bool

func (IndexArray) Copy

func (i IndexArray) Copy() IndexArray

func (IndexArray) Next

func (i IndexArray) Next(maxIndex, sub int) IndexArray

func (IndexArray) Validate

func (i IndexArray) Validate(maxIndex int)

type MappingFunction

type MappingFunction data.MappingFunction

func MappingSequence added in v0.16.0

func MappingSequence(mapper ...MappingFunction) MappingFunction

type ProcessChain

type ProcessChain interface {
	Transform(t TransformFunction) ProcessChain
	Explode(m ExplodeFunction) ProcessChain
	Map(m MappingFunction) ProcessChain
	Filter(f FilterFunction) ProcessChain
	Sort(c CompareFunction) ProcessChain
	WithPool(p ProcessorPool) ProcessChain
	Unordered() ProcessChain
	Parallel(n int) ProcessChain
	Append(p ProcessChain) ProcessChain

	Process(data data.Iterable) ProcessingResult
}

ProcessChain is a data structure holding a chain definition, which is a chain of step creation functions used to instantiate the chain for a dedicated input processing. The instantiation is initiated by calling the Process method on a chain.

func Append

func Append(chain, add ProcessChain, conditions ...options.Condition) ProcessChain

func Chain

func Chain(log logging.Context) ProcessChain

func Explode

func Explode(e ExplodeFunction) ProcessChain

func Filter

func Filter(f FilterFunction) ProcessChain

func Map

func Parallel

func Parallel(n int) ProcessChain

func Sort

func Transform

func Transform(t TransformFunction) ProcessChain

func Unordered

func Unordered() ProcessChain

func WithPool

func WithPool(pool ProcessorPool) ProcessChain

type ProcessingBuffer

type ProcessingBuffer interface {
	Add(e ProcessingEntry) ProcessingBuffer
	Len() int
	Get(int) interface{}
	Open()
	Close()

	ProcessingIterable

	IsClosed() bool
}

func NewOrderedBuffer

func NewOrderedBuffer(log logging.Context) ProcessingBuffer

func NewSimpleBuffer

func NewSimpleBuffer(log logging.Context) ProcessingBuffer

type ProcessingEntry

type ProcessingEntry struct {
	Index    Index
	MaxIndex int
	MaxSub   int
	Valid    bool
	Value    interface{}
}

func NewEntry

func NewEntry(i Index, v interface{}, opts ...interface{}) ProcessingEntry

type ProcessingIterable

type ProcessingIterable interface {
	ProcessingIterator() ProcessingIterator
	Iterator() data.Iterator
}

func NewEntryIterableFromIterable

func NewEntryIterableFromIterable(log logging.Context, data data.Iterable) ProcessingIterable

type ProcessingIterator

type ProcessingIterator interface {
	HasNext() bool
	NextProcessingEntry() ProcessingEntry
}

type ProcessingResult

func Process

func Process(log logging.Context, data data.Iterable) ProcessingResult

Process processes an initial empty chain by converting an iterable into a ProcessingResult.

type ProcessingSource

type ProcessingSource interface {
	IncrementalProcessingSource
}

func NewAsyncProcessingSource

func NewAsyncProcessingSource(log logging.Context, f func() data.Iterable, pool ProcessorPool) ProcessingSource

func NewIncrementalProcessingSource

func NewIncrementalProcessingSource(log logging.Context) ProcessingSource

type ProcessorPool

type ProcessorPool interface {
	Request()
	Release()
	Exec(func())
}

func NewProcessorPool

func NewProcessorPool(n int) ProcessorPool

func NewUnlimitedProcessorPool

func NewUnlimitedProcessorPool() ProcessorPool

type SubEntries

type SubEntries int

type TransformFunction

type TransformFunction = func(iterable data.Iterable) data.Iterable

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL