Documentation ¶
Overview ¶
Package operator implements streaming operators
Index ¶
- Constants
- func Name(op Operator) string
- type BaseIn
- type BaseInOutOp
- type BaseOut
- type BatchContainer
- type BatcherOperator
- type Chain
- type DistribKey
- type DistributeOperator
- type DistributorChildOp
- type FanoutOperator
- type HardStopChannelCloser
- type In
- type InChain
- type InOutOperator
- type InterfaceContainer
- type Object
- type Operator
- type OrderedChain
- type Out
- type ParallelizableOperator
- type ProcessedNotifier
- type Runner
- func (r *Runner) Add(op Operator)
- func (r *Runner) AsyncRun(op Operator)
- func (r *Runner) AsyncRunAll()
- func (r *Runner) CloseNotifier() <-chan bool
- func (r *Runner) ErrorChannel() <-chan error
- func (r *Runner) HardStop()
- func (r *Runner) Operators() []Operator
- func (r *Runner) WaitGroup() *sync.WaitGroup
- type RunningCount
- type SimpleChain
- func (c *SimpleChain) Add(o Operator) Chain
- func (c *SimpleChain) NewSubChain() Chain
- func (c *SimpleChain) Operators() []Operator
- func (c *SimpleChain) Run() error
- func (c *SimpleChain) SetName(name string) Chain
- func (c *SimpleChain) SoftStop() error
- func (c *SimpleChain) Start() error
- func (c *SimpleChain) Stop() error
- func (c *SimpleChain) Wait() error
- type SimpleProcessedNotifier
Constants ¶
View Source
const CHAN_SLACK = 100
This is the default channel slack operators should use when creating output channels
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BaseIn ¶
type BaseIn struct {
// contains filtered or unexported fields
}
func (*BaseIn) GetInDepth ¶
type BaseInOutOp ¶
type BaseInOutOp struct { *HardStopChannelCloser *BaseIn *BaseOut }
func NewBaseInOutOp ¶
func NewBaseInOutOp(slack int) *BaseInOutOp
type BatchContainer ¶
type BatcherOperator ¶
type BatcherOperator struct { *HardStopChannelCloser *BaseIn *BaseOut MaxOutstanding uint // contains filtered or unexported fields }
func NewBatchOperator ¶
func NewBatchOperator(name string, container BatchContainer, processedDownstream ProcessedNotifier) *BatcherOperator
func NewInterfaceBatchOp ¶
func NewInterfaceBatchOp(pn ProcessedNotifier) *BatcherOperator
func (*BatcherOperator) DownstreamCanAcceptFlush ¶
func (op *BatcherOperator) DownstreamCanAcceptFlush() bool
INVARIANT CAN FLUSH OR WAITING: DownstreamCanAcceptFlush || DownstreamWillCallback
func (*BatcherOperator) DownstreamWillCallback ¶
func (op *BatcherOperator) DownstreamWillCallback() bool
func (*BatcherOperator) Flush ¶
func (op *BatcherOperator) Flush()
func (*BatcherOperator) LastFlush ¶
func (op *BatcherOperator) LastFlush()
func (*BatcherOperator) Run ¶
func (op *BatcherOperator) Run() error
func (*BatcherOperator) SetTimeouts ¶
func (op *BatcherOperator) SetTimeouts(td time.Duration)
type Chain ¶
type Chain interface { Operators() []Operator Run() error Stop() error Add(o Operator) Chain SetName(string) Chain //NewSubChain creates a new empty chain inheriting the properties of the parent chain //Usefull for distribute/fanout building functions NewSubChain() Chain //async functions Start() error Wait() error }
type DistribKey ¶
type DistribKey interface{}
type DistributeOperator ¶
type DistributeOperator struct { *HardStopChannelCloser *BaseIn // contains filtered or unexported fields }
func NewDistributor ¶
func NewDistributor(mapp func(Object) DistribKey, creator func(DistribKey) DistributorChildOp) *DistributeOperator
func (*DistributeOperator) Run ¶
func (op *DistributeOperator) Run() error
type DistributorChildOp ¶
type FanoutOperator ¶
type FanoutOperator struct { *HardStopChannelCloser *BaseIn // contains filtered or unexported fields }
func NewFanoutOp ¶
func NewFanoutOp() *FanoutOperator
func (*FanoutOperator) Add ¶
func (op *FanoutOperator) Add(newOp fanoutChildOp)
func (*FanoutOperator) Run ¶
func (op *FanoutOperator) Run() error
type HardStopChannelCloser ¶
type HardStopChannelCloser struct {
StopNotifier chan bool
}
func NewHardStopChannelCloser ¶
func NewHardStopChannelCloser() *HardStopChannelCloser
func (*HardStopChannelCloser) Stop ¶
func (op *HardStopChannelCloser) Stop() error
type InChain ¶
func NewInChainWrapper ¶
type InOutOperator ¶
type InterfaceContainer ¶
type InterfaceContainer struct {
// contains filtered or unexported fields
}
func NewInterfaceContainer ¶
func NewInterfaceContainer() *InterfaceContainer
func (*InterfaceContainer) Add ¶
func (c *InterfaceContainer) Add(obj Object)
func (*InterfaceContainer) Flush ¶
func (c *InterfaceContainer) Flush(out chan<- Object) bool
func (*InterfaceContainer) FlushAll ¶
func (c *InterfaceContainer) FlushAll(out chan<- Object) bool
func (*InterfaceContainer) HasItems ¶
func (c *InterfaceContainer) HasItems() bool
type Operator ¶
type Operator interface { // Run runs the operation of the stream. It should never return before all the goroutines it started have quit // It should return in 3 cases: // i) on error (returning the error) // ii) on soft close (Its input has closed the channel, return nil) // iii) on hard close (Stop was called on the operator, return nil) Run() error // Stop force a hard close of the stream. Look at HardStopChannelCloser for a possible implementation. Should be thread-safe // Stop will only be called once but it can be called before or after run exits. If called after, is a no-op Stop() error }
soft stops are created by closing the input channel
type OrderedChain ¶
type OrderedChain struct {
*SimpleChain
}
func NewOrderedChain ¶
func NewOrderedChain() *OrderedChain
func (*OrderedChain) Add ¶
func (c *OrderedChain) Add(o Operator) Chain
func (*OrderedChain) NewSubChain ¶
func (c *OrderedChain) NewSubChain() Chain
type ParallelizableOperator ¶
type ParallelizableOperator interface { Operator IsParallel() bool IsOrdered() bool MakeOrdered() ParallelizableOperator }
type ProcessedNotifier ¶
type ProcessedNotifier interface { NotificationChannel() <-chan uint Notify(count uint) Blocking() bool }
func NewNonBlockingProcessedNotifier ¶
func NewNonBlockingProcessedNotifier(slack int) ProcessedNotifier
func NewProcessedNotifier ¶
func NewProcessedNotifier() ProcessedNotifier
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
func (*Runner) AsyncRunAll ¶
func (r *Runner) AsyncRunAll()
func (*Runner) CloseNotifier ¶
func (*Runner) ErrorChannel ¶
type RunningCount ¶
type RunningCount struct {
// contains filtered or unexported fields
}
func NewRunningCount ¶
func NewRunningCount(sz int) *RunningCount
func (*RunningCount) Add ¶
func (rc *RunningCount) Add(i int)
func (*RunningCount) GetAverage ¶
func (rc *RunningCount) GetAverage() int
func (*RunningCount) GetAverageMin ¶
func (rc *RunningCount) GetAverageMin(min int) int
type SimpleChain ¶
type SimpleChain struct { Name string // contains filtered or unexported fields }
A SimpleChain implements the operator interface too!
func NewChain ¶
func NewChain() *SimpleChain
func NewSimpleChain ¶
func NewSimpleChain() *SimpleChain
func (*SimpleChain) Add ¶
func (c *SimpleChain) Add(o Operator) Chain
func (*SimpleChain) NewSubChain ¶
func (c *SimpleChain) NewSubChain() Chain
func (*SimpleChain) Operators ¶
func (c *SimpleChain) Operators() []Operator
func (*SimpleChain) SetName ¶
func (c *SimpleChain) SetName(name string) Chain
func (*SimpleChain) SoftStop ¶
func (c *SimpleChain) SoftStop() error
func (*SimpleChain) Start ¶
func (c *SimpleChain) Start() error
func (*SimpleChain) Stop ¶
func (c *SimpleChain) Stop() error
A stop is a hard stop as per the Operator interface
func (*SimpleChain) Wait ¶
func (c *SimpleChain) Wait() error
type SimpleProcessedNotifier ¶
type SimpleProcessedNotifier struct { Block bool // contains filtered or unexported fields }
func (*SimpleProcessedNotifier) Blocking ¶
func (n *SimpleProcessedNotifier) Blocking() bool
func (*SimpleProcessedNotifier) NotificationChannel ¶
func (n *SimpleProcessedNotifier) NotificationChannel() <-chan uint
func (*SimpleProcessedNotifier) Notify ¶
func (n *SimpleProcessedNotifier) Notify(count uint)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.