Documentation ¶
Overview ¶
Package operator implements streaming operators
Index ¶
- Constants
- func Name(op Operator) string
- type BaseIn
- type BaseInOutOp
- type BaseOut
- type BatchContainer
- type BatcherOperator
- type Chain
- type ChannelFanoutOperator
- type DistribKey
- type DistributeOperator
- type DistributorChildOp
- type FailFastRunner
- type FailSilentRunner
- func (r *FailSilentRunner) Add(op Operator)
- func (r *FailSilentRunner) AsyncRun(op Operator)
- func (r *FailSilentRunner) AsyncRunAll()
- func (r *FailSilentRunner) HardStop()
- func (r *FailSilentRunner) Operators() []Operator
- func (c *FailSilentRunner) Run() error
- func (r *FailSilentRunner) SetErrorHandler(handler func(error))
- func (r *FailSilentRunner) SetFinishedHandler(handler func())
- func (c *FailSilentRunner) SetName(name string)
- func (r *FailSilentRunner) SetOpCloseHandler(handler func(Operator, error))
- func (c *FailSilentRunner) Stop() error
- func (r *FailSilentRunner) Wait() error
- func (r *FailSilentRunner) WaitGroup() *sync.WaitGroup
- type FaninOperator
- type FanoutOperator
- type HardStopChannelCloser
- type In
- type InChain
- type InOutOperator
- type InterfaceContainer
- type Object
- type Operator
- type OrderedChain
- type Out
- type ParallelizableOperator
- type ProcessedNotifier
- type Runner
- type RunningCount
- type SimpleChain
- func (c *SimpleChain) Add(o Operator) Chain
- func (c *SimpleChain) NewSubChain() Chain
- func (c *SimpleChain) Operators() []Operator
- func (c *SimpleChain) Run() error
- func (c *SimpleChain) SetName(name string) Chain
- func (c *SimpleChain) SoftStop() error
- func (c *SimpleChain) Start() error
- func (c *SimpleChain) Stop() error
- func (c *SimpleChain) Wait() error
- type SimpleProcessedNotifier
Constants ¶
const CHAN_SLACK = 100
This is the default channel slack operators should use when creating output channels
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BaseIn ¶
type BaseIn struct {
// contains filtered or unexported fields
}
func (*BaseIn) GetInDepth ¶
type BaseInOutOp ¶
type BaseInOutOp struct { *HardStopChannelCloser *BaseIn *BaseOut }
func NewBaseInOutOp ¶
func NewBaseInOutOp(slack int) *BaseInOutOp
type BaseOut ¶
type BaseOut struct {
// contains filtered or unexported fields
}
func NewBaseOut ¶
func (*BaseOut) CloseOutput ¶
func (o *BaseOut) CloseOutput()
func (*BaseOut) SetCloseOnExit ¶
type BatchContainer ¶
type BatcherOperator ¶
type BatcherOperator struct { *HardStopChannelCloser *BaseIn *BaseOut MaxOutstanding uint // contains filtered or unexported fields }
func NewBatchOperator ¶
func NewBatchOperator(name string, container BatchContainer, processedDownstream ProcessedNotifier) *BatcherOperator
func NewInterfaceBatchOp ¶
func NewInterfaceBatchOp(pn ProcessedNotifier) *BatcherOperator
func (*BatcherOperator) DownstreamCanAcceptFlush ¶
func (op *BatcherOperator) DownstreamCanAcceptFlush() bool
INVARIANT CAN FLUSH OR WAITING: DownstreamCanAcceptFlush || DownstreamWillCallback
func (*BatcherOperator) DownstreamWillCallback ¶
func (op *BatcherOperator) DownstreamWillCallback() bool
func (*BatcherOperator) Flush ¶
func (op *BatcherOperator) Flush()
func (*BatcherOperator) LastFlush ¶
func (op *BatcherOperator) LastFlush()
func (*BatcherOperator) Run ¶
func (op *BatcherOperator) Run() error
func (*BatcherOperator) SetTimeouts ¶
func (op *BatcherOperator) SetTimeouts(td time.Duration)
type Chain ¶
type Chain interface { Operators() []Operator Run() error Stop() error Add(o Operator) Chain SetName(string) Chain //NewSubChain creates a new empty //chain inheriting the properties of the parent chain //Usefull for distribute/fanout building functions NewSubChain() Chain //async functions Start() error Wait() error }
type ChannelFanoutOperator ¶
type ChannelFanoutOperator struct { *HardStopChannelCloser *BaseIn // contains filtered or unexported fields }
ChannelFanoutOperator does a fanout of info across a set of downstream channels. acts synchronously in that it will write to all downstreams before reading the next upstream channel.
func NewChannelFanoutOp ¶
func NewChannelFanoutOp() *ChannelFanoutOperator
func (*ChannelFanoutOperator) Add ¶
func (op *ChannelFanoutOperator) Add(newChannel chan Object)
func (*ChannelFanoutOperator) Run ¶
func (op *ChannelFanoutOperator) Run() error
type DistribKey ¶
type DistribKey interface{}
type DistributeOperator ¶
type DistributeOperator struct { *HardStopChannelCloser *BaseIn // contains filtered or unexported fields }
func NewDistributor ¶
func NewDistributor(mapp func(Object) DistribKey, creator func(DistribKey) (DistributorChildOp, bool)) *DistributeOperator
func (*DistributeOperator) Run ¶
func (op *DistributeOperator) Run() error
type DistributorChildOp ¶
type FailFastRunner ¶
type FailFastRunner struct { *FailSilentRunner // contains filtered or unexported fields }
func NewFailFastRunner ¶
func NewFailFastRunner() *FailFastRunner
func (*FailFastRunner) SetErrorHandler ¶
func (t *FailFastRunner) SetErrorHandler(h func(error))
type FailSilentRunner ¶
type FailSilentRunner struct { Name string // contains filtered or unexported fields }
func NewFailSilentRunner ¶
func NewFailSilentRunner() *FailSilentRunner
func (*FailSilentRunner) Add ¶
func (r *FailSilentRunner) Add(op Operator)
func (*FailSilentRunner) AsyncRun ¶
func (r *FailSilentRunner) AsyncRun(op Operator)
func (*FailSilentRunner) AsyncRunAll ¶
func (r *FailSilentRunner) AsyncRunAll()
func (*FailSilentRunner) HardStop ¶
func (r *FailSilentRunner) HardStop()
func (*FailSilentRunner) Operators ¶
func (r *FailSilentRunner) Operators() []Operator
func (*FailSilentRunner) Run ¶
func (c *FailSilentRunner) Run() error
func (*FailSilentRunner) SetErrorHandler ¶
func (r *FailSilentRunner) SetErrorHandler(handler func(error))
handler will be called on each error
func (*FailSilentRunner) SetFinishedHandler ¶
func (r *FailSilentRunner) SetFinishedHandler(handler func())
handler will be called when all ops exited
func (*FailSilentRunner) SetName ¶
func (c *FailSilentRunner) SetName(name string)
func (*FailSilentRunner) SetOpCloseHandler ¶
func (r *FailSilentRunner) SetOpCloseHandler(handler func(Operator, error))
handler to be called when an op exits
func (*FailSilentRunner) Stop ¶
func (c *FailSilentRunner) Stop() error
func (*FailSilentRunner) Wait ¶
func (r *FailSilentRunner) Wait() error
func (*FailSilentRunner) WaitGroup ¶
func (r *FailSilentRunner) WaitGroup() *sync.WaitGroup
type FaninOperator ¶
type FaninOperator struct { *HardStopChannelCloser // contains filtered or unexported fields }
func NewFaninOp ¶
func NewFaninOp() *FaninOperator
func (*FaninOperator) AddSrc ¶
func (op *FaninOperator) AddSrc(newOp faninSrcOp)
func (*FaninOperator) Out ¶
func (op *FaninOperator) Out() chan Object
func (*FaninOperator) Run ¶
func (op *FaninOperator) Run() error
func (*FaninOperator) SetDest ¶
func (op *FaninOperator) SetDest(newOp faninDestOp)
func (*FaninOperator) SetOut ¶
func (o *FaninOperator) SetOut(c chan Object)
func (*FaninOperator) Stop ¶
func (op *FaninOperator) Stop() error
type FanoutOperator ¶
type FanoutOperator struct { *HardStopChannelCloser *BaseIn // contains filtered or unexported fields }
func NewFanoutOp ¶
func NewFanoutOp() *FanoutOperator
func (*FanoutOperator) Add ¶
func (op *FanoutOperator) Add(newOp fanoutChildOp)
func (*FanoutOperator) Run ¶
func (op *FanoutOperator) Run() error
type HardStopChannelCloser ¶
type HardStopChannelCloser struct {
StopNotifier chan bool
}
func NewHardStopChannelCloser ¶
func NewHardStopChannelCloser() *HardStopChannelCloser
func (*HardStopChannelCloser) Stop ¶
func (op *HardStopChannelCloser) Stop() error
type InChain ¶
func NewInChainWrapper ¶
type InOutOperator ¶
type InterfaceContainer ¶
type InterfaceContainer struct {
// contains filtered or unexported fields
}
func NewInterfaceContainer ¶
func NewInterfaceContainer() *InterfaceContainer
func (*InterfaceContainer) Add ¶
func (c *InterfaceContainer) Add(obj Object)
func (*InterfaceContainer) Flush ¶
func (c *InterfaceContainer) Flush(out chan<- Object) bool
func (*InterfaceContainer) FlushAll ¶
func (c *InterfaceContainer) FlushAll(out chan<- Object) bool
func (*InterfaceContainer) HasItems ¶
func (c *InterfaceContainer) HasItems() bool
func (*InterfaceContainer) IsFull ¶
func (c *InterfaceContainer) IsFull() bool
type Operator ¶
type Operator interface { // Run runs the operation of the stream. It should never return before all the goroutines it started have quit // It should return in 3 cases: // i) on error (returning the error) // ii) on soft close (Its input has closed the channel, return nil) // iii) on hard close (Stop was called on the operator, return nil) Run() error // Stop force a hard close of the stream. Look at HardStopChannelCloser for a possible implementation. Should be thread-safe // Stop will probably be called once but should be idempotent. It can be called before or after run exits. If called after, is a no-op Stop() error }
soft stops are created by closing the input channel
type OrderedChain ¶
type OrderedChain struct {
*SimpleChain
}
func NewOrderedChain ¶
func NewOrderedChain() *OrderedChain
func (*OrderedChain) Add ¶
func (c *OrderedChain) Add(o Operator) Chain
func (*OrderedChain) NewSubChain ¶
func (c *OrderedChain) NewSubChain() Chain
type ParallelizableOperator ¶
type ParallelizableOperator interface { Operator IsParallel() bool IsOrdered() bool MakeOrdered() ParallelizableOperator }
type ProcessedNotifier ¶
type ProcessedNotifier interface { NotificationChannel() <-chan uint Notify(count uint) Blocking() bool }
func NewNonBlockingProcessedNotifier ¶
func NewNonBlockingProcessedNotifier(slack int) ProcessedNotifier
func NewProcessedNotifier ¶
func NewProcessedNotifier() ProcessedNotifier
type Runner ¶
type Runner interface { SetName(name string) WaitGroup() *sync.WaitGroup Wait() error Operators() []Operator AsyncRun(op Operator) Add(op Operator) AsyncRunAll() HardStop() /* operator compat */ Run() error Stop() error /* handler will be called on each error */ SetErrorHandler(handler func(error)) /* handler will be called when all ops exited */ SetFinishedHandler(handler func()) /* handler to be called when an op exits */ SetOpCloseHandler(handler func(Operator, error)) }
type RunningCount ¶
type RunningCount struct {
// contains filtered or unexported fields
}
func NewRunningCount ¶
func NewRunningCount(sz int) *RunningCount
func (*RunningCount) Add ¶
func (rc *RunningCount) Add(i int)
func (*RunningCount) GetAverage ¶
func (rc *RunningCount) GetAverage() int
func (*RunningCount) GetAverageMin ¶
func (rc *RunningCount) GetAverageMin(min int) int
type SimpleChain ¶
type SimpleChain struct { Name string // contains filtered or unexported fields }
A SimpleChain implements the operator interface too!
func NewChain ¶
func NewChain() *SimpleChain
func NewSimpleChain ¶
func NewSimpleChain() *SimpleChain
func (*SimpleChain) Add ¶
func (c *SimpleChain) Add(o Operator) Chain
func (*SimpleChain) NewSubChain ¶
func (c *SimpleChain) NewSubChain() Chain
func (*SimpleChain) Operators ¶
func (c *SimpleChain) Operators() []Operator
func (*SimpleChain) SetName ¶
func (c *SimpleChain) SetName(name string) Chain
func (*SimpleChain) SoftStop ¶
func (c *SimpleChain) SoftStop() error
func (*SimpleChain) Start ¶
func (c *SimpleChain) Start() error
func (*SimpleChain) Stop ¶
func (c *SimpleChain) Stop() error
A stop is a hard stop as per the Operator interface
func (*SimpleChain) Wait ¶
func (c *SimpleChain) Wait() error
type SimpleProcessedNotifier ¶
type SimpleProcessedNotifier struct { Block bool // contains filtered or unexported fields }
func (*SimpleProcessedNotifier) Blocking ¶
func (n *SimpleProcessedNotifier) Blocking() bool
func (*SimpleProcessedNotifier) NotificationChannel ¶
func (n *SimpleProcessedNotifier) NotificationChannel() <-chan uint
func (*SimpleProcessedNotifier) Notify ¶
func (n *SimpleProcessedNotifier) Notify(count uint)