stream

package
v0.0.0-...-75137db Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2013 License: BSD-2-Clause Imports: 7 Imported by: 28

Documentation

Overview

Package operator implements streaming operators

Index

Constants

View Source
const CHAN_SLACK = 100

This is the default channel slack operators should use when creating output channels

Variables

This section is empty.

Functions

func Name

func Name(op Operator) string

Types

type BaseIn

type BaseIn struct {
	// contains filtered or unexported fields
}

func NewBaseIn

func NewBaseIn(slack int) *BaseIn

func (*BaseIn) GetInDepth

func (o *BaseIn) GetInDepth() int

func (*BaseIn) In

func (o *BaseIn) In() chan Object

func (*BaseIn) SetIn

func (o *BaseIn) SetIn(c chan Object)

type BaseInOutOp

type BaseInOutOp struct {
	*HardStopChannelCloser
	*BaseIn
	*BaseOut
}

func NewBaseInOutOp

func NewBaseInOutOp(slack int) *BaseInOutOp

type BaseOut

type BaseOut struct {
	// contains filtered or unexported fields
}

func NewBaseOut

func NewBaseOut(slack int) *BaseOut

func (*BaseOut) Out

func (o *BaseOut) Out() chan Object

func (*BaseOut) SetOut

func (o *BaseOut) SetOut(c chan Object)

type BatchContainer

type BatchContainer interface {
	Flush(chan<- Object) bool
	FlushAll(chan<- Object) bool
	HasItems() bool
	Add(object Object)
}

type BatcherOperator

type BatcherOperator struct {
	*HardStopChannelCloser
	*BaseIn
	*BaseOut

	MaxOutstanding uint
	// contains filtered or unexported fields
}

func NewBatchOperator

func NewBatchOperator(name string, container BatchContainer, processedDownstream ProcessedNotifier) *BatcherOperator

func NewInterfaceBatchOp

func NewInterfaceBatchOp(pn ProcessedNotifier) *BatcherOperator

func (*BatcherOperator) DownstreamCanAcceptFlush

func (op *BatcherOperator) DownstreamCanAcceptFlush() bool

INVARIANT CAN FLUSH OR WAITING: DownstreamCanAcceptFlush || DownstreamWillCallback

func (*BatcherOperator) DownstreamWillCallback

func (op *BatcherOperator) DownstreamWillCallback() bool

func (*BatcherOperator) Flush

func (op *BatcherOperator) Flush()

func (*BatcherOperator) LastFlush

func (op *BatcherOperator) LastFlush()

func (*BatcherOperator) Run

func (op *BatcherOperator) Run() error

func (*BatcherOperator) SetTimeouts

func (op *BatcherOperator) SetTimeouts(td time.Duration)

type Chain

type Chain interface {
	Operators() []Operator
	Run() error
	Stop() error
	Add(o Operator) Chain
	SetName(string) Chain

	//NewSubChain creates a new empty chain inheriting the properties of the parent chain
	//Usefull for distribute/fanout building functions
	NewSubChain() Chain

	//async functions
	Start() error
	Wait() error
}

type DistribKey

type DistribKey interface{}

type DistributeOperator

type DistributeOperator struct {
	*HardStopChannelCloser
	*BaseIn
	// contains filtered or unexported fields
}

func NewDistributor

func NewDistributor(mapp func(Object) DistribKey, creator func(DistribKey) DistributorChildOp) *DistributeOperator

func (*DistributeOperator) Run

func (op *DistributeOperator) Run() error

type DistributorChildOp

type DistributorChildOp interface {
	Operator
	In
}

type FanoutOperator

type FanoutOperator struct {
	*HardStopChannelCloser
	*BaseIn
	// contains filtered or unexported fields
}

func NewFanoutOp

func NewFanoutOp() *FanoutOperator

func (*FanoutOperator) Add

func (op *FanoutOperator) Add(newOp fanoutChildOp)

func (*FanoutOperator) Run

func (op *FanoutOperator) Run() error

type HardStopChannelCloser

type HardStopChannelCloser struct {
	StopNotifier chan bool
}

func NewHardStopChannelCloser

func NewHardStopChannelCloser() *HardStopChannelCloser

func (*HardStopChannelCloser) Stop

func (op *HardStopChannelCloser) Stop() error

type In

type In interface {
	In() chan Object
	SetIn(c chan Object)
	GetInDepth() int
}

type InChain

type InChain interface {
	Chain
	In
}

func NewInChainWrapper

func NewInChainWrapper(c Chain) InChain

type InOutOperator

type InOutOperator interface {
	Operator
	Out
	In
}

type InterfaceContainer

type InterfaceContainer struct {
	// contains filtered or unexported fields
}

func NewInterfaceContainer

func NewInterfaceContainer() *InterfaceContainer

func (*InterfaceContainer) Add

func (c *InterfaceContainer) Add(obj Object)

func (*InterfaceContainer) Flush

func (c *InterfaceContainer) Flush(out chan<- Object) bool

func (*InterfaceContainer) FlushAll

func (c *InterfaceContainer) FlushAll(out chan<- Object) bool

func (*InterfaceContainer) HasItems

func (c *InterfaceContainer) HasItems() bool

type Object

type Object interface{}

type Operator

type Operator interface {

	// Run runs the operation of the stream. It should never return before all the goroutines it started have quit
	// It should return in 3 cases:
	// i) on error (returning the error)
	// ii) on soft close (Its input has closed the channel, return nil)
	// iii) on hard close (Stop was called on the operator, return nil)
	Run() error

	// Stop force a hard close of the stream. Look at HardStopChannelCloser for a possible implementation. Should be thread-safe
	// Stop will only be called once but it can be called before or after run exits. If called after, is a no-op
	Stop() error
}

soft stops are created by closing the input channel

type OrderedChain

type OrderedChain struct {
	*SimpleChain
}

func NewOrderedChain

func NewOrderedChain() *OrderedChain

func (*OrderedChain) Add

func (c *OrderedChain) Add(o Operator) Chain

func (*OrderedChain) NewSubChain

func (c *OrderedChain) NewSubChain() Chain

type Out

type Out interface {
	Out() chan Object
	SetOut(c chan Object)
}

type ParallelizableOperator

type ParallelizableOperator interface {
	Operator
	IsParallel() bool
	IsOrdered() bool
	MakeOrdered() ParallelizableOperator
}

type ProcessedNotifier

type ProcessedNotifier interface {
	NotificationChannel() <-chan uint
	Notify(count uint)
	Blocking() bool
}

func NewNonBlockingProcessedNotifier

func NewNonBlockingProcessedNotifier(slack int) ProcessedNotifier

func NewProcessedNotifier

func NewProcessedNotifier() ProcessedNotifier

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner() *Runner

func (*Runner) Add

func (r *Runner) Add(op Operator)

func (*Runner) AsyncRun

func (r *Runner) AsyncRun(op Operator)

func (*Runner) AsyncRunAll

func (r *Runner) AsyncRunAll()

func (*Runner) CloseNotifier

func (r *Runner) CloseNotifier() <-chan bool

func (*Runner) ErrorChannel

func (r *Runner) ErrorChannel() <-chan error

func (*Runner) HardStop

func (r *Runner) HardStop()

func (*Runner) Operators

func (r *Runner) Operators() []Operator

func (*Runner) WaitGroup

func (r *Runner) WaitGroup() *sync.WaitGroup

type RunningCount

type RunningCount struct {
	// contains filtered or unexported fields
}

func NewRunningCount

func NewRunningCount(sz int) *RunningCount

func (*RunningCount) Add

func (rc *RunningCount) Add(i int)

func (*RunningCount) GetAverage

func (rc *RunningCount) GetAverage() int

func (*RunningCount) GetAverageMin

func (rc *RunningCount) GetAverageMin(min int) int

type SimpleChain

type SimpleChain struct {
	Name string
	// contains filtered or unexported fields
}

A SimpleChain implements the operator interface too!

func NewChain

func NewChain() *SimpleChain

func NewSimpleChain

func NewSimpleChain() *SimpleChain

func (*SimpleChain) Add

func (c *SimpleChain) Add(o Operator) Chain

func (*SimpleChain) NewSubChain

func (c *SimpleChain) NewSubChain() Chain

func (*SimpleChain) Operators

func (c *SimpleChain) Operators() []Operator

func (*SimpleChain) Run

func (c *SimpleChain) Run() error

Operator compatibility

func (*SimpleChain) SetName

func (c *SimpleChain) SetName(name string) Chain

func (*SimpleChain) SoftStop

func (c *SimpleChain) SoftStop() error

func (*SimpleChain) Start

func (c *SimpleChain) Start() error

func (*SimpleChain) Stop

func (c *SimpleChain) Stop() error

A stop is a hard stop as per the Operator interface

func (*SimpleChain) Wait

func (c *SimpleChain) Wait() error

type SimpleProcessedNotifier

type SimpleProcessedNotifier struct {
	Block bool
	// contains filtered or unexported fields
}

func (*SimpleProcessedNotifier) Blocking

func (n *SimpleProcessedNotifier) Blocking() bool

func (*SimpleProcessedNotifier) NotificationChannel

func (n *SimpleProcessedNotifier) NotificationChannel() <-chan uint

func (*SimpleProcessedNotifier) Notify

func (n *SimpleProcessedNotifier) Notify(count uint)

Directories

Path Synopsis
zmq
zmq

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL