stream

package
v0.0.0-...-2c2c8a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2015 License: BSD-2-Clause Imports: 6 Imported by: 14

Documentation

Overview

Package operator implements streaming operators

Index

Constants

View Source
const CHAN_SLACK = 100

This is the default channel slack operators should use when creating output channels

Variables

This section is empty.

Functions

func Name

func Name(op Operator) string

Types

type BaseIn

type BaseIn struct {
	// contains filtered or unexported fields
}

func NewBaseIn

func NewBaseIn(slack int) *BaseIn

func (*BaseIn) GetInDepth

func (o *BaseIn) GetInDepth() int

func (*BaseIn) In

func (o *BaseIn) In() chan Object

func (*BaseIn) SetIn

func (o *BaseIn) SetIn(c chan Object)

type BaseInOutOp

type BaseInOutOp struct {
	*HardStopChannelCloser
	*BaseIn
	*BaseOut
}

func NewBaseInOutOp

func NewBaseInOutOp(slack int) *BaseInOutOp

type BaseOut

type BaseOut struct {
	// contains filtered or unexported fields
}

func NewBaseOut

func NewBaseOut(slack int) *BaseOut

func (*BaseOut) CloseOutput

func (o *BaseOut) CloseOutput()

func (*BaseOut) Out

func (o *BaseOut) Out() chan Object

func (*BaseOut) SetCloseOnExit

func (o *BaseOut) SetCloseOnExit(flag bool)

func (*BaseOut) SetOut

func (o *BaseOut) SetOut(c chan Object)

type BatchContainer

type BatchContainer interface {
	Flush(chan<- Object) bool
	FlushAll(chan<- Object) bool
	HasItems() bool
	Add(object Object)
	IsFull() bool //container shouldn't remain full after a flush. Invariant: IsFull() implies HasItems()
}

type BatcherOperator

type BatcherOperator struct {
	*HardStopChannelCloser
	*BaseIn
	*BaseOut

	MaxOutstanding uint
	// contains filtered or unexported fields
}

func NewBatchOperator

func NewBatchOperator(name string, container BatchContainer, processedDownstream ProcessedNotifier) *BatcherOperator

func NewInterfaceBatchOp

func NewInterfaceBatchOp(pn ProcessedNotifier) *BatcherOperator

func (*BatcherOperator) DownstreamCanAcceptFlush

func (op *BatcherOperator) DownstreamCanAcceptFlush() bool

INVARIANT CAN FLUSH OR WAITING: DownstreamCanAcceptFlush || DownstreamWillCallback

func (*BatcherOperator) DownstreamWillCallback

func (op *BatcherOperator) DownstreamWillCallback() bool

func (*BatcherOperator) Flush

func (op *BatcherOperator) Flush()

func (*BatcherOperator) LastFlush

func (op *BatcherOperator) LastFlush()

func (*BatcherOperator) Run

func (op *BatcherOperator) Run() error

func (*BatcherOperator) SetTimeouts

func (op *BatcherOperator) SetTimeouts(td time.Duration)

type Chain

type Chain interface {
	Operators() []Operator
	Run() error
	Stop() error
	Add(o Operator) Chain
	SetName(string) Chain
	//NewSubChain creates a new empty
	//chain inheriting the properties of the parent chain
	//Usefull for distribute/fanout building functions
	NewSubChain() Chain
	//async functions
	Start() error
	Wait() error
}

type ChannelFanoutOperator

type ChannelFanoutOperator struct {
	*HardStopChannelCloser
	*BaseIn
	// contains filtered or unexported fields
}

ChannelFanoutOperator does a fanout of info across a set of downstream channels. acts synchronously in that it will write to all downstreams before reading the next upstream channel.

func NewChannelFanoutOp

func NewChannelFanoutOp() *ChannelFanoutOperator

func (*ChannelFanoutOperator) Add

func (op *ChannelFanoutOperator) Add(newChannel chan Object)

func (*ChannelFanoutOperator) Run

func (op *ChannelFanoutOperator) Run() error

type DistribKey

type DistribKey interface{}

type DistributeOperator

type DistributeOperator struct {
	*HardStopChannelCloser
	*BaseIn
	// contains filtered or unexported fields
}

func NewDistributor

func NewDistributor(mapp func(Object) DistribKey, creator func(DistribKey) (DistributorChildOp, bool)) *DistributeOperator

func (*DistributeOperator) Run

func (op *DistributeOperator) Run() error

type DistributorChildOp

type DistributorChildOp interface {
	Operator
	In
}

type FailFastRunner

type FailFastRunner struct {
	*FailSilentRunner
	// contains filtered or unexported fields
}

func NewFailFastRunner

func NewFailFastRunner() *FailFastRunner

func (*FailFastRunner) SetErrorHandler

func (t *FailFastRunner) SetErrorHandler(h func(error))

type FailSilentRunner

type FailSilentRunner struct {
	Name string
	// contains filtered or unexported fields
}

func NewFailSilentRunner

func NewFailSilentRunner() *FailSilentRunner

func (*FailSilentRunner) Add

func (r *FailSilentRunner) Add(op Operator)

func (*FailSilentRunner) AsyncRun

func (r *FailSilentRunner) AsyncRun(op Operator)

func (*FailSilentRunner) AsyncRunAll

func (r *FailSilentRunner) AsyncRunAll()

func (*FailSilentRunner) HardStop

func (r *FailSilentRunner) HardStop()

func (*FailSilentRunner) Operators

func (r *FailSilentRunner) Operators() []Operator

func (*FailSilentRunner) Run

func (c *FailSilentRunner) Run() error

func (*FailSilentRunner) SetErrorHandler

func (r *FailSilentRunner) SetErrorHandler(handler func(error))

handler will be called on each error

func (*FailSilentRunner) SetFinishedHandler

func (r *FailSilentRunner) SetFinishedHandler(handler func())

handler will be called when all ops exited

func (*FailSilentRunner) SetName

func (c *FailSilentRunner) SetName(name string)

func (*FailSilentRunner) SetOpCloseHandler

func (r *FailSilentRunner) SetOpCloseHandler(handler func(Operator, error))

handler to be called when an op exits

func (*FailSilentRunner) Stop

func (c *FailSilentRunner) Stop() error

func (*FailSilentRunner) Wait

func (r *FailSilentRunner) Wait() error

func (*FailSilentRunner) WaitGroup

func (r *FailSilentRunner) WaitGroup() *sync.WaitGroup

type FaninOperator

type FaninOperator struct {
	*HardStopChannelCloser
	// contains filtered or unexported fields
}

func NewFaninOp

func NewFaninOp() *FaninOperator

func (*FaninOperator) AddSrc

func (op *FaninOperator) AddSrc(newOp faninSrcOp)

func (*FaninOperator) Out

func (op *FaninOperator) Out() chan Object

func (*FaninOperator) Run

func (op *FaninOperator) Run() error

func (*FaninOperator) SetDest

func (op *FaninOperator) SetDest(newOp faninDestOp)

func (*FaninOperator) SetOut

func (o *FaninOperator) SetOut(c chan Object)

func (*FaninOperator) Stop

func (op *FaninOperator) Stop() error

type FanoutOperator

type FanoutOperator struct {
	*HardStopChannelCloser
	*BaseIn
	// contains filtered or unexported fields
}

func NewFanoutOp

func NewFanoutOp() *FanoutOperator

func (*FanoutOperator) Add

func (op *FanoutOperator) Add(newOp fanoutChildOp)

func (*FanoutOperator) Run

func (op *FanoutOperator) Run() error

type HardStopChannelCloser

type HardStopChannelCloser struct {
	StopNotifier chan bool
}

func NewHardStopChannelCloser

func NewHardStopChannelCloser() *HardStopChannelCloser

func (*HardStopChannelCloser) Stop

func (op *HardStopChannelCloser) Stop() error

type In

type In interface {
	In() chan Object
	SetIn(c chan Object)
	GetInDepth() int
}

type InChain

type InChain interface {
	Chain
	In
}

func NewInChainWrapper

func NewInChainWrapper(c Chain) InChain

type InOutOperator

type InOutOperator interface {
	Operator
	Out
	In
}

type InterfaceContainer

type InterfaceContainer struct {
	// contains filtered or unexported fields
}

func NewInterfaceContainer

func NewInterfaceContainer() *InterfaceContainer

func (*InterfaceContainer) Add

func (c *InterfaceContainer) Add(obj Object)

func (*InterfaceContainer) Flush

func (c *InterfaceContainer) Flush(out chan<- Object) bool

func (*InterfaceContainer) FlushAll

func (c *InterfaceContainer) FlushAll(out chan<- Object) bool

func (*InterfaceContainer) HasItems

func (c *InterfaceContainer) HasItems() bool

func (*InterfaceContainer) IsFull

func (c *InterfaceContainer) IsFull() bool

type Object

type Object interface{}

type Operator

type Operator interface {

	// Run runs the operation of the stream. It should never return before all the goroutines it started have quit
	// It should return in 3 cases:
	// i) on error (returning the error)
	// ii) on soft close (Its input has closed the channel, return nil)
	// iii) on hard close (Stop was called on the operator, return nil)
	Run() error

	// Stop force a hard close of the stream. Look at HardStopChannelCloser for a possible implementation. Should be thread-safe
	// Stop will probably be called once but should be idempotent. It can be called before or after run exits. If called after, is a no-op
	Stop() error
}

soft stops are created by closing the input channel

type OrderedChain

type OrderedChain struct {
	*SimpleChain
}

func NewOrderedChain

func NewOrderedChain() *OrderedChain

func (*OrderedChain) Add

func (c *OrderedChain) Add(o Operator) Chain

func (*OrderedChain) NewSubChain

func (c *OrderedChain) NewSubChain() Chain

type Out

type Out interface {
	Out() chan Object
	SetOut(c chan Object)
}

type ParallelizableOperator

type ParallelizableOperator interface {
	Operator
	IsParallel() bool
	IsOrdered() bool
	MakeOrdered() ParallelizableOperator
}

type ProcessedNotifier

type ProcessedNotifier interface {
	NotificationChannel() <-chan uint
	Notify(count uint)
	Blocking() bool
}

func NewNonBlockingProcessedNotifier

func NewNonBlockingProcessedNotifier(slack int) ProcessedNotifier

func NewProcessedNotifier

func NewProcessedNotifier() ProcessedNotifier

type Runner

type Runner interface {
	SetName(name string)
	WaitGroup() *sync.WaitGroup
	Wait() error

	Operators() []Operator
	AsyncRun(op Operator)

	Add(op Operator)
	AsyncRunAll()
	HardStop()

	/* operator compat */
	Run() error
	Stop() error

	/* handler will be called on each error */
	SetErrorHandler(handler func(error))

	/* handler will be called when all ops exited */
	SetFinishedHandler(handler func())

	/* handler to be called when an op exits */
	SetOpCloseHandler(handler func(Operator, error))
}

func NewRunner

func NewRunner() Runner

type RunningCount

type RunningCount struct {
	// contains filtered or unexported fields
}

func NewRunningCount

func NewRunningCount(sz int) *RunningCount

func (*RunningCount) Add

func (rc *RunningCount) Add(i int)

func (*RunningCount) GetAverage

func (rc *RunningCount) GetAverage() int

func (*RunningCount) GetAverageMin

func (rc *RunningCount) GetAverageMin(min int) int

type SimpleChain

type SimpleChain struct {
	Name string
	// contains filtered or unexported fields
}

A SimpleChain implements the operator interface too!

func NewChain

func NewChain() *SimpleChain

func NewSimpleChain

func NewSimpleChain() *SimpleChain

func (*SimpleChain) Add

func (c *SimpleChain) Add(o Operator) Chain

func (*SimpleChain) NewSubChain

func (c *SimpleChain) NewSubChain() Chain

func (*SimpleChain) Operators

func (c *SimpleChain) Operators() []Operator

func (*SimpleChain) Run

func (c *SimpleChain) Run() error

Operator compatibility

func (*SimpleChain) SetName

func (c *SimpleChain) SetName(name string) Chain

func (*SimpleChain) SoftStop

func (c *SimpleChain) SoftStop() error

func (*SimpleChain) Start

func (c *SimpleChain) Start() error

func (*SimpleChain) Stop

func (c *SimpleChain) Stop() error

A stop is a hard stop as per the Operator interface

func (*SimpleChain) Wait

func (c *SimpleChain) Wait() error

type SimpleProcessedNotifier

type SimpleProcessedNotifier struct {
	Block bool
	// contains filtered or unexported fields
}

func (*SimpleProcessedNotifier) Blocking

func (n *SimpleProcessedNotifier) Blocking() bool

func (*SimpleProcessedNotifier) NotificationChannel

func (n *SimpleProcessedNotifier) NotificationChannel() <-chan uint

func (*SimpleProcessedNotifier) Notify

func (n *SimpleProcessedNotifier) Notify(count uint)

Directories

Path Synopsis
zmq
zmq

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL