core

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2017 License: MIT Imports: 14 Imported by: 102

Documentation

Overview

Package core provides primitive components used for stream processing.

Index

Constants

View Source
const (
	// MaxCapacity is the maximum capacity or buffer size of pipes.
	MaxCapacity int = 1<<17 - 1
)

Variables

View Source
var (
	// ErrSourceRewound is returned when the source is rewound and it has to
	// reproduce the stream again.
	ErrSourceRewound = errors.New("the source has been rewound")

	// ErrSourceStopped is returned when the source tries to write a tuple
	// after it's stopped. It's currently only returned from a writer passed
	// to a source created by NewRewindableSource.
	ErrSourceStopped = errors.New("the source has been stopped")
)

Functions

func FatalError

func FatalError(err error) error

FatalError decorates the given error so that IsFatalError(FatalError(err)) returns true even if err doesn't have Fatal method. It will panic if err is nil.

func IsFatalError

func IsFatalError(err error) bool

IsFatalError returns true when the given error is fatal. If the error implements the following interface, IsFatalError returns the return value of Fatal method:

interface {
	Fatal() bool
}

Otherwise, the error is considered non-fatal and it returns false.

Each component in core package behaves differently when it receives a fatal error. All functions or methods have a documentation about how they behave on this error, so please read them for details.

func IsNotExist

func IsNotExist(err error) bool

IsNotExist returns true when the error is related to "not found" or is os.ErrNotExist. To be consistent with os.IsNotExist, the name of this function is IsNotExist, not IsNotExistError.

If the error implements following interface, IsNotExist returns the return value of NotExist method:

interface {
	NotExist() bool
}

func IsTemporaryError

func IsTemporaryError(err error) bool

IsTemporaryError returns true when the given error is temporary. If the error implements the following interface, IsTemporaryError returns the return value of Temporary method:

interface {
	Temporary() bool
}

Otherwise, the error is considered permanent and it returns false.

Each component in core package behaves differently when it receives a temporary error. All functions or methods have a documentation about how they behave on this error, so please read them for details.

func NewTemporaryID

func NewTemporaryID() int64

NewTemporaryID returns the new temporary 63bit ID. This can be used for any purpose.

func NotExistError

func NotExistError(err error) error

NotExistError decorates the given error so that IsNotExist returns true even if err doesn't have NotExist method. It will panic if err is nil.

func TemporaryError

func TemporaryError(err error) error

TemporaryError decorates the given error so that IsTemporaryError(TemporaryError(err)) returns true even if err doesn't have Temporary method. It will panic if err is nil.

func ValidateSymbol

func ValidateSymbol(name string) error

ValidateSymbol validates if the given string has valid format and length for user-defined entities and is not one of the reserved words. The minimum length of a symbol is 1 and the maximum is 127. The format has to be [a-zA-Z][a-zA-Z0-9_]*.

Types

type AtomicFlag

type AtomicFlag int32

AtomicFlag is a boolean flag which can be read/written atomically.

func (*AtomicFlag) Enabled

func (a *AtomicFlag) Enabled() bool

Enabled returns true if the flag is enabled.

func (*AtomicFlag) Set

func (a *AtomicFlag) Set(b bool)

Set sets a boolean value to the flag.

type Box

type Box interface {
	// Process is called on a Box for each item in the input stream.
	// The processed result must be written to the given Writer
	// object.
	// A Box must not modify the given Tuple object directly if its TFShared
	// flag is set. A Box can create a copy of the Tuple and modify it instead
	// of the original. If TFShared flag is not set, a Box can directly modify
	// the Tuple and does not have to create a copy. A Box can keep a reference
	// to the written object and then access it at a later point in time by
	// setting its TFShared flag. In addition, if the object is copied by Copy
	// method, a box can keep the copied object and freely modify it later.
	// Note that this specification is added after v0.4 and some test codes
	// doesn't follow this rule. Please do not reuse such inappropriate
	// implementations.
	//
	// Note that there may be multiple concurrent calls to Process,
	// so if internal state is accessed, proper locking mechanisms
	// must be used.
	// Also note that the same Context pointer that was passed to
	// Process should be handed over to Writer.Write so that the
	// same settings will be used by that Writer.
	//
	// Process can tell the caller about each error in more detail by
	// returning behavioral error types. The caller of Process must
	// follow three rules:
	//
	//	1. The caller must not call Process again if the error is fatal
	//	   (i.e. IsFatalError(err) == true). Because there can be multiple
	//	   goroutines on a single Box, this rule only applies to the goroutine
	//	   which received a fatal error.
	//	2. The caller may retry the same tuple or discard it if the error is
	//	   temporary but not fatal (i.e. IsFatalError(err) == false && IsTemporaryError(err) == true).
	//	   The caller may call Process again with the same tuple, or may even
	//	   discard the tuple and skip it. The number of retry which the caller
	//	   should attempt is not defined.
	//	3. The caller must discard the tuple and must not retry if the error
	//	   isn't temporary nor fatal (i.e. IsFatalError(err) == false && IsTemporaryError(err) == false).
	//	   The caller can call Process again with a different tuple, that is
	//	   it can just skip the tuple which Process returned the error.
	//
	// Once Process returns a fatal error, it must always return fatal errors
	// after that. Process might be called even after it returned a fatal error.
	// Terminate method will be called even if Process returns a fatal error.
	//
	// When Process returns a temporary error, the call shouldn't have any
	// side effect on the state of the Box,  that is consecutive retry calls
	// should only be reflected once regardless of the number of attempt the
	// caller makes. For example, let's assume a Box B is counting the word
	// in tuples. Then, a caller invoked B.Process with a tuple t. If B.Process
	// with t returned a temporary error, the count B has shouldn't be changed
	// until the retry succeeds.
	Process(ctx *Context, t *Tuple, w Writer) error
}

A Box is an elementary building block of a SensorBee topology. It is the equivalent of a StreamTask in Samza or a Bolt in Storm.

func BoxFunc

func BoxFunc(b func(ctx *Context, t *Tuple, w Writer) error) Box

BoxFunc can be used to add all methods required to fulfill the Box interface to a normal function with the signature

func(ctx *Context, t *Tuple, s Writer) error

Example:

forward := func(ctx *Context, t *Tuple, w Writer) error {
    w.Write(ctx, t)
    return nil
}
var box Box = BoxFunc(forward)

type BoxConfig

type BoxConfig struct {

	// RemoveOnStop is a flag which indicates the stop state of the topology.
	// If it is true, the box is removed.
	RemoveOnStop bool

	// Meta contains meta information of the box. This field won't be used
	// by core package and application can store any form of information
	// related to the box.
	Meta interface{}
}

BoxConfig has configuration parameters of a Box node.

type BoxInputConfig

type BoxInputConfig struct {
	// InputName is a custom name attached to incoming tuples. When it is empty,
	// "*" will be used.
	InputName string

	// Capacity is the maximum capacity or buffer size (length) of input pipe.
	// When this parameter is 0, the default value is used. This parameter is
	// only used as a hint and doesn't guarantee that the pipe can actually have
	// the specified number of tuples.
	Capacity int

	// DropMode is a mode which controls the behavior of dropping tuples at the
	// output side of the queue when it is full.
	DropMode QueueDropMode
}

BoxInputConfig has parameters to customize input behavior of a Box on each input pipe.

func (*BoxInputConfig) Validate added in v0.5.0

func (c *BoxInputConfig) Validate() error

Validate validates values of BoxInputConfig.

type BoxNode

type BoxNode interface {
	Node

	// Box returns internal source passed to Topology.AddBox.
	Box() Box

	// Input adds a new input from a Source, another Box, or even the Box
	// itself. refname refers a name of node from which the Box want to receive
	// tuples. There must be a Source or a Box having the name.
	Input(refname string, config *BoxInputConfig) error

	// EnableGracefulStop activates a graceful stop mode. If it is enabled,
	// Stop method waits until the Box doesn't have an incoming tuple. The Box
	// doesn't wait until, for example, a source generates all tuples. It only
	// waits for the moment when the Box's input queue gets empty and stops
	// even if some inputs are about to send a new tuple to the Box.
	EnableGracefulStop()

	// StopOnDisconnect tells the Box that it may automatically stop when all
	// inbound or outbound connections (channels or pipes) are closed. After
	// calling this method, the Box can automatically stop even if Stop method
	// isn't explicitly called.
	//
	// connDir can be InboundConnection, OutboundConnection, or bitwise-or of
	// them. When both InboundConnection and OutboundConnection is specified,
	// the Box stops if all inbound connections are closed OR all outbound
	// connections are closed. For example, when a Box has two inbound
	// connections and one outbound connections, it stops if the outbound
	// connections is closed while two inbound connections are active.
	//
	// Currently, there's no way to disable StopOnDisconnect once it's enabled.
	// Also, it simply overwrites the direction flag as follows, so Inbound and
	// Outbound can be set separately:
	//
	//	boxNode.StopOnDisconnect(core.Inbound | core.Outbound)
	//	boxNode.StopOnDisconnect(core.Outbound) // core.Inbound is still enabled.
	StopOnDisconnect(dir ConnDir)
}

BoxNode is a Box registered to a topology.

type ConnDir

type ConnDir int

ConnDir shows a direction of a connection between nodes.

const (
	// Inbound represents an inbound connection.
	Inbound ConnDir = 1 << iota

	// Outbound represents an outbound connection.
	Outbound
)

type Context

type Context struct {
	Flags        ContextFlags
	SharedStates SharedStateRegistry
	// contains filtered or unexported fields
}

Context holds a set of functionality that is made available to each Topology at runtime. A context is created by the user before creating a Topology. Each Context is tied to one Topology and it must not be used by multiple topologies.

func NewContext

func NewContext(config *ContextConfig) *Context

NewContext creates a new Context based on the config. If config is nil, the default config will be used.

func (*Context) ErrLog

func (c *Context) ErrLog(err error) *logrus.Entry

ErrLog returns the logger tied to the Context having an error information.

func (*Context) Log

func (c *Context) Log() *logrus.Entry

Log returns the logger tied to the Context.

type ContextConfig

type ContextConfig struct {
	// Logger provides a logrus's logger used by the Context.
	Logger *logrus.Logger
	Flags  ContextFlags
}

ContextConfig has configuration parameters of a Context.

type ContextFlags

type ContextFlags struct {
	// TupleTrace is a Tuple's tracing on/off flag. If the flag is 0
	// (means false), a topology does not trace Tuple's events.
	// The flag can be set when creating a Context, or when the topology
	// is running. In the latter case, Context.TupleTrace.Set() should
	// be used for thread safety.
	// There is a delay between setting the flag and start/stop to trace Tuples.
	TupleTrace AtomicFlag

	// DroppedTupleLog is a flag which turns on/off logging of dropped tuple
	// events. When DestinationlessTupleLog flag isn't set, Destinationless
	// tuples are not logged even if this flag is set.
	DroppedTupleLog AtomicFlag

	// DestinationlessTupleLog is a flag which turns on/off logging of dropped
	// tuple events. A destinationless tuple is one kind of dropped tuples that
	// is generated when a source or a stream doesn't have any destination and,
	// therefore, a tuple is dropped. It often happens when a topology isn't
	// fully built.
	//
	// To log destinationless tuples, DroppedTupleLog flag also needs to be set.
	DestinationlessTupleLog AtomicFlag

	// DroppedTupleSummarization is a flag to trun on/off summarization of
	// dropped tuple logging. If this flag is enabled, tuples being logged will
	// be a little smaller than the originals. However, they might not be parsed
	// as JSONs. If the flag is disabled, output JSONs can be parsed.
	DroppedTupleSummarization AtomicFlag
}

ContextFlags is an arrangement of SensorBee processing settings.

type EventType

type EventType int

EventType has a type of an event related to Tuple processing.

const (
	// ETInput represents an event where a tuple entered some
	// processing unit (e.g., a Box)
	ETInput EventType = iota
	// ETOutput represents an event where a tuple left some
	// processing unit (e.g., a Box)
	ETOutput
	// ETOther represents any other event
	ETOther
)

func (EventType) String

func (t EventType) String() string

type LoadableSharedState

type LoadableSharedState interface {
	SavableSharedState

	// Load overwrites the state with save data. Parameters don't have to be
	// same as Save's parameters. They can even be completely different.
	// There MUST NOT be a required parameter. Values of required parameters
	// should be saved with the state itself.
	//
	// Load and other methods including Save can be called concurrently.
	Load(ctx *Context, r io.Reader, params data.Map) error
}

LoadableSharedState is a SharedState which can be persisted through Save and Load method.

type NamedInputBox

type NamedInputBox interface {
	Box

	// InputNames returns a slice of names which the box accepts as input names.
	// If it's empty, any name is accepted. Otherwise, the box only accepts
	// names in the slice.
	InputNames() []string
}

NamedInputBox is a box whose inputs have custom input names.

type Node

type Node interface {
	// Type returns the type of the node, which can be NTSource, NTBox,
	// or NTSink. It's safe to convert Node to a specific node interface
	// corresponding to the returned NodeType. For example, if NTBox is
	// returned, the node can be converted to BoxNode with a type
	// assertion.
	Type() NodeType

	// Name returns the name of the node in the registered topology.
	Name() string

	// State returns the current state of the node.
	State() TopologyStateHolder

	// Stop stops the node. When the node is a source, Stop waits until the
	// source actually stops generating tuples. When the node is a box a sink,
	// it waits until the box or the sink is terminated.
	//
	// The node will not be removed from the topology after it stopped by
	// default. Use RemoveOnStop option to remove it from the topology when it
	// stops.
	//
	// Stop never panics.
	Stop() error

	// Status returns the status of the node. Each node type returns different
	// status information.
	//
	// When the node is a Source, following information will be returned:
	//
	//	* state: the current state of the Source
	//	* error: an error message of the Source if an error happened and it stopped the Source
	//	* output_stats: statistical information of the Source's output
	//	* behaviors:
	//		* stop_on_disconnect: true if the Source stops when all outbound
	//		                      connections are closed
	//		* remove_on_stop: true if the Source is removed from the topology
	//		                  when it stops
	//	* source: the status of the Source if it implements Statuser
	//
	// When the node is a Box, following information will be returned:
	//
	//	* state: the current state of the Box
	//	* error: an error message of the Box if an error happened and it stopped the Box
	//	* input_stats: statistical information of the Source's output
	//	* output_stats: statistical information of the Box's output
	//	* behaviors:
	//		* stop_on_inbound_disconnect: true if the Box stops when all inbound
	//		                              connections are closed
	//		* stop_on_outbound_disconnect: true if the Box stops when all
	//		                               outbound connections are closed
	//		* graceful_stop: true if the graceful_stop mode is enabled
	//		* remove_on_stop: true if the Box is removed from the topology
	//		                  when it stops
	//	* box: the status of the Box if it implements Statuser
	//
	// When the node is a Sink, following information will be returned:
	//
	//	* state: the current state of the Box
	//	* error: an error message of the Sink if an error happened and it stopped the Sink
	//	* input_stats: statistical information of the Source's output
	//	* behaviors:
	//		* stop_on_disconnect: true if the Sink stops when all inbound
	//		                      connections are closed
	//		* graceful_stop: true if the graceful_stop mode is enabled
	//		* remove_on_stop: true if the Sink is removed from the topology
	//		                  when it stops
	//	* sink: the status of the Sink if it implements Statuser
	//
	// "input_stats" contains statistical information of the node's input. It
	// has following fields:
	//
	//	* num_received_total: the total number of tuples the node received
	//	* num_errors: the number of errors that the node failed to process tuples
	//	              including temporary errors
	//	* inputs: the information of data sources connected to the node
	//
	// "inputs" field in "input_stats" contains the input statistics of each
	// data sources as data.Map. Each input has the following information:
	//
	//	* num_received: the number of tuples the node has received so far
	//	* queue_size: the size of the queue connected to the node
	//	* num_queued: the number of tuples buffered in the queue
	//
	// "output_stats" contains statistical information of the node's output. It
	// has following fields:
	//
	//	* num_sent_total: the total number of tuples sent from this node including
	//	                  the number of dropped tuples
	//	* num_dropped: the number of tuples which have been dropped because no
	//	               data destination is connected to the node
	//	* outputs: the information of data destinations connected to the node
	//
	// "outputs" contains the output statistics of each data destinations as
	// data.Map. Each output has following information:
	//
	//	* num_sent: the number of tuples the node has sent so far
	//	* queue_size: the size of the queue connected to the node
	//	* num_queued: the number of tuples buffered in the queue
	//
	// Numbers in inputs and outputs might not be accurate because they use
	// loose synchronization for efficiency.
	Status() data.Map

	// Meta returns meta information of the node. The meta information can be
	// updated by changing the return value. However, the meta information is
	// not protected from concurrent writes and the caller has to care about it.
	Meta() interface{}

	// RemoveOnStop tells the Node that it may automatically remove from the
	// topology when it stops.
	RemoveOnStop()
}

Node is a node registered to a topology. It defines methods common to Source, Box, and Sink nodes.

type NodeType

type NodeType int

NodeType represents the type of a node in a topology.

const (
	// NTSource means the node is a Source.
	NTSource NodeType = iota

	// NTBox means the node is a Box.
	NTBox

	// NTSink means the node is a Sink.
	NTSink
)

func (NodeType) String

func (t NodeType) String() string

type QueueDropMode

type QueueDropMode int

QueueDropMode is a mode which controls the behavior of dropping tuples from a pipe when an output tuple is written to its queue that is full. This mode doesn't care about precision of computation results. When a Box connected to an output queue supports sophisticated load shedding algorithm, specify DropNone.

const (
	// DropNone is one of QueueDropMode that a Source or Box doesn't drop any
	// tuple when its output queue is full. This is the default mode.
	DropNone QueueDropMode = iota

	// DropLatest is one of QueueDropMode that a Source and a Box drops the
	// latest tuple (i.e. the tuple which is being sent) when its output queue
	// is full.
	DropLatest

	// DropOldest is one of QueueDropMode that a Source and a Box drops the
	// oldest tuple being queued when its output queue is full.
	DropOldest
)

type Resumable

type Resumable interface {
	// Pause pauses a running node. A paused node can be resumed by calling
	// Resume method. Pause is idempotent and pausing a paused node shouldn't
	// fail. Pause may be called before a node runs. For example, when a node
	// is a source, Pause could be called before calling GenerateStream. In
	// that case, GenerateStream should not generate any tuple until Resume is
	// called.
	//
	// When Stop is called while the node is paused, the node must stop without
	// waiting for Resume.
	Pause(ctx *Context) error

	// Resume resumes a paused node. Resume is idempotent and resuming a running
	// node shouldn't fail. Resume may be called before a node runs.
	Resume(ctx *Context) error
}

Resumable is a node in a topology which can dynamically be paused and resumed at runtime.

type RewindableSource

type RewindableSource interface {
	Source
	Resumable

	// Rewind rewinds the stream the Source has. Rewind may be called
	// concurrently while GenerateStream is running. If Rewind is called while
	// the Source is being paused, the Source must not be resumed until
	// Resume is explicitly called. The Source doesn't have to care about
	// Pause/Resume if it doesn't implement Resumable.
	Rewind(ctx *Context) error
}

RewindableSource is a Source which can be rewound and generate the same stream from the beginning again (e.g. file based source).

Until Stop is called, RewindableSource must not return from GenerateStream after it has generated all tuples.

It must be resumable because it's impossible to provide rewinding without handling pause and resume appropriately.

func NewRewindableSource

func NewRewindableSource(s Source) RewindableSource

NewRewindableSource creates a rewindable source from a non-rewindable source. The source passed to this function must satisfy the following requirements:

  1. Its GenerateStream can safely be called multiple times.
  2. Its GenerateStream must return when ErrSourceRewound or ErrSourceStopped is returned from the Writer. It must return the same err instance returned from the writer.

It can be resumable, but its Pause and Resume won't be called. It doesn't have to implement Stop method (i.e. it can just return nil), either, although it has to provide it. Instead of implementing Stop method, it can return from GenerateStream when the writer returned ErrSourceStopped. If the Source has to clean up resources, it can implement Stop to do it. However, GenerateStream is still running while Stop is being called. Therefore, all resource allocation and deallocation should be done in GenerateStream rather than in an initialization function and Stop.

The interface returned from this function will support following interfaces if the given source implements them:

  • Statuser

Known issue: There's one problem with NewRewindableSource. Stop method could block when the original source's GenerateStream doesn't generate any tuple (i.e. doesn't write any tuple) without returning from GenerateStream since whether the source is stopped is only determined by the error returned from Write.

type SavableSharedState

type SavableSharedState interface {
	SharedState

	// Save writes data of the state to a given writer. Save receives parameters
	// which are used to customize the behavior of the method. Parameters are
	// defined by each component and there's no common definition.
	//
	// Save and other methods can be called concurrently.
	Save(ctx *Context, w io.Writer, params data.Map) error
}

SavableSharedState is a SharedState which can be persisted through Save method. Providing forward/backward compatibility of the saved file format is the responsibility of the author of the state.

Because the best way of implementing Load method depends on each SharedState, it doesn't always have to be provided with Save method.

type SharedState

type SharedState interface {
	// Terminate finalizes the state. The state can no longer be used after
	// this method is called. This method doesn't have to be idempotent.
	//
	// Write or other methods the actual instance has might be called after
	// Terminate method is called. When it occurs, they should return an error.
	// Also, Terminate and them can be called concurrently.
	Terminate(ctx *Context) error
}

SharedState is a state which nodes in a topology can access. It can be a machine learning model, a data structure for aggregation (like a histogram), a configuration information for specific Boxes, and so on.

SharedState doesn't have methods to read it's internal data because internal data representation heavily depends on each SharedState implementation. The easiest way to use SharedState from a component is to obtain the actual data type via the type assertion. See examples to learn more about how to use it.

If a SharedState also implements Writer interface, it can be updated via SharedStateSink. Write method in it writes a tuple to the state. How tuples are processed depends on each SharedState. For example, a machine learning model might use a tuple as a training data, and another state could compute the average of a specific field. Write may return fatal or temporary errors as Box.Process does. See the documentation of Box.Process for details.

Write method might be called after Terminate method is called. When it occurs, Write should return an error. Also, Write and Terminate can be called concurrently.

type SharedStateRegistry

type SharedStateRegistry interface {
	// Add adds a state to the registry. It fails if the registry already has
	// a state having the same name. Add also calls SharedState.Init. If it
	// fails Add returns an error and doesn't register the SharedState. The
	// caller doesn't have to call Terminate on failure.
	//
	// Don't add the same instance of SharedState more than once to registries.
	// Otherwise, Init and Terminate methods of the state will be called
	// multiple times.
	Add(name, typeName string, s SharedState) error

	// Get returns a SharedState having the name in the registry. It returns
	// NotExistError if the registry doesn't have the state.
	Get(name string) (SharedState, error)

	// Type returns a type of a SharedState. It returns NotExistError if the
	// registry doesn't have the state.
	Type(name string) (string, error)

	// Replace replaces the previous SharedState instance with a new instance.
	// The previous instance is returned on success if any. The previous state
	// will not be terminated by the registry and the caller must call
	// Terminate. The type name must be same as the previous state's type name.
	//
	// The given SharedState is terminated when it cannot be replaced.
	Replace(name, typeName string, s SharedState) (SharedState, error)

	// List returns a map containing all SharedState the registry has.
	// The map returned from this method can safely be modified.
	List() (map[string]SharedState, error)

	// Remove removes a SharedState the registry has. It automatically
	// terminates the state. If SharedState.Terminate failed, Remove returns an
	// error. However, even if it returns an error, the state is removed from
	// the registry.
	//
	// Remove also returns the removed SharedState if the registry has it. When
	// SharedState.Terminate fails, Remove returns both the removed SharedState
	// and an error. If the registry doesn't have a SharedState having the name,
	// it returns a nil SharedState and NotExistError.
	Remove(name string) (SharedState, error)
}

SharedStateRegistry manages SharedState with names assigned to each state.

func NewDefaultSharedStateRegistry

func NewDefaultSharedStateRegistry(ctx *Context) SharedStateRegistry

NewDefaultSharedStateRegistry create a default registry of SharedStates.

type Sink

type Sink interface {
	WriteCloser
}

A Sink describes a location that data can be written to after it was processed by a topology, i.e., it represents an entity outside of the topology (e.g., a fluentd instance).

Write method may return fatal or temporary errors as Box.Process does. See the documentation of Box.Process for details.

func NewSharedStateSink

func NewSharedStateSink(ctx *Context, name string) (Sink, error)

NewSharedStateSink creates a sink that writes to SharedState.

type SinkConfig

type SinkConfig struct {
	// RemoveOnStop is a flag which indicates the stop state of the topology.
	// If it is true, the sink is removed.
	RemoveOnStop bool

	// Meta contains meta information of the sink. This field won't be used
	// by core package and application can store any form of information
	// related to the sink.
	Meta interface{}
}

SinkConfig has configuration parameters of a Sink node.

type SinkInputConfig

type SinkInputConfig struct {
	// Capacity is the maximum capacity (length) of input pipe. When this
	// parameter is 0, the default value is used. This parameter is only used
	// as a hint and doesn't guarantee that the pipe can actually have the
	// specified number of tuples.
	Capacity int

	// DropMode is a mode which controls the behavior of dropping tuples at the
	// output side of the queue when it is full.
	DropMode QueueDropMode
}

SinkInputConfig has parameters to customize input behavior of a Sink on each input pipe.

func (*SinkInputConfig) Validate added in v0.5.0

func (c *SinkInputConfig) Validate() error

Validate validates values of SinkInputConfig.

type SinkNode

type SinkNode interface {
	Node

	// Sink returns internal source passed to Topology.AddSink.
	Sink() Sink

	// Input adds a new input from a Source or a Box. refname refers a name of
	// node from which the Box want to receive tuples. There must be a Source
	// or a Box having the name.
	Input(refname string, config *SinkInputConfig) error

	// EnableGracefulStop activates a graceful stop mode. If it is enabled,
	// Stop method waits until the Sink doesn't have an incoming tuple. The Sink
	// doesn't wait until, for example, a source generates all tuples. It only
	// waits for the moment when the Sink's input queue gets empty and stops
	// even if some inputs are about to send a new tuple to the Sink.
	EnableGracefulStop()

	// StopOnDisconnect tells the Sink that it may automatically stop when all
	// incoming connections (channels or pipes) are closed. After calling this
	// method, the Sink can automatically stop even if Stop method isn't
	// explicitly called.
	StopOnDisconnect()
}

SinkNode is a Sink registered to a topology.

type Source

type Source interface {
	// GenerateStream will start creating tuples and writing them to
	// the given Writer in a blocking way. (Start as a goroutine to start
	// the process in the background.) It will return when all tuples
	// have been written (in the case of a finite data source) or if
	// there was a severe error. The context that is passed in will be
	// used as a parameter to the Write method of the given Writer.
	GenerateStream(ctx *Context, w Writer) error

	// Stop will tell the Source to stop emitting tuples. After this
	// function returns, no more calls to Write shall be made on the
	// Writer passed in to GenerateStream.
	//
	// Stop could be called after GenerateStream returns. However,
	// it's guaranteed that Stop won't be called more than once by
	// components in SensorBee's core package.
	//
	// Stop won't be called if GenerateStream wasn't called.
	Stop(ctx *Context) error
}

A Source describes an entity that inserts data into a topology from the outside world (e.g., a fluentd instance).

func ImplementSourceStop

func ImplementSourceStop(s Source) Source

ImplementSourceStop implements Stop method of a Source in a thread-safe manner on behalf of the given Source. Source passed to this function must follow the rule described in NewRewindableSource with one exception that the Writer doesn't return ErrSourceRewound. The source returned from this function isn't rewindable even if the original Source is compatible with RewindableSource interface.

func NewDroppedTupleCollectorSource

func NewDroppedTupleCollectorSource() Source

NewDroppedTupleCollectorSource returns a source which generates a stream containing tuples dropped by other nodes. Tuples generated from this source won't be reported again even if they're dropped later on. This is done by setting TFDropped flag to the dropped tuple. Therefore, if a Box forgets to copy the flag when it emits a tuple derived from the dropped one, the tuple can infinitely be reported again and again.

Tuples generated from this source has the following fields in Data:

  • node_type: the type of the node which dropped the tuple
  • node_name: the name of the node which dropped the tuple
  • event_type: the type of the event indicating when the tuple was dropped
  • error(optional): the error information if any
  • data: the original content in which the dropped tuple had

type SourceConfig

type SourceConfig struct {
	// PausedOnStartup is a flag which indicates the initial state of the
	// source. If it is true, the source is paused. Otherwise, source runs just
	// after it is added to a topology.
	PausedOnStartup bool

	// RemoveOnStop is a flag which indicates the stop state of the topology.
	// If it is true, the source is removed.
	RemoveOnStop bool

	// Meta contains meta information of the source. This field won't be used
	// by core package and application can store any form of information
	// related to the source.
	Meta interface{}
}

SourceConfig has configuration parameters of a Source node.

type SourceNode

type SourceNode interface {
	Node

	// Source returns internal source passed to Topology.AddSource.
	Source() Source

	// Pause pauses a running source. A paused source can be resumed by calling
	// Resume method. Pause is idempotent.
	Pause() error

	// Resume resumes a paused source. Resume is idempotent.
	Resume() error

	// Rewind rewinds the stream if the Source supports it. Rewind doesn't
	// resume the stream if the Source is paused. Call Resume explicitly to
	// resume a paused Source after Rewind.
	//
	// Rewind returns an error if the Source doesn't support Rewind, or the
	// node is already stopped.
	Rewind() error

	// StopOnDisconnect tells the Source that it may automatically stop when all
	// outband connections (channels or pipes) are closed. After calling this
	// method, the Source can automatically stop even if Stop method isn't
	// explicitly called.
	StopOnDisconnect()
}

SourceNode is a Source registered to a topology.

type StatefulBox

type StatefulBox interface {
	Box

	// Init is called on each Box in a Topology when Topology.Add is called.
	// It can be used to keep a reference to the Context object or initialize
	// other forms of state.
	//
	// When the same instance of the box is added to a topology more than
	// once, Init will be called multiple times.
	Init(ctx *Context) error

	// Terminate finalizes a Box. The Box can no longer be used after
	// this method is called. This method doesn't have to be idempotent,
	// that is the behavior is undefined when this method is called
	// more than once. Terminate isn't called if Init fails on the Box.
	//
	// As long as the Box is used by components in core package, no method
	// will be called after the invocation of Terminate method. In addition,
	// the Box may assume that Terminate is not called concurrently with
	// any other methods.
	//
	// When the same instance of the box is added to a topology more than
	// once, Terminate will be called multiple times. In that case, use
	// something like a reference counter to avoid deallocating resources
	// before the Box is actually removed from a topology.
	Terminate(ctx *Context) error
}

StatefulBox is a Box having an internal state that needs to be initialized before it's used by a topology. Because a Box can be implemented in C or C++, a Terminate method is also provided to deallocate resources it used during processing.

An instance of StatefulBox shouldn't be added to a topology or a topology builder more than once if it doesn't handle duplicated initialization and termination correctly (i.e. with something like a reference counter).

type Statuser

type Statuser interface {
	// Status returns the status of the component. Formats of statuses are not
	// strictly defined and each component can provide any information it has.
	// Node.Status has a document to show what information it returns.
	Status() data.Map
}

Statuser is an interface which provides Status method to retrieve its status.

type Topology

type Topology interface {
	// Name returns the name of this topology.
	Name() string

	// Context returns the Context tied to the topology. It isn't always safe
	// to modify fields of the Context. However, some fields of it can handle
	// concurrent access properly. See the document of Context for details.
	Context() *Context

	// AddSource adds a Source to the topology. It will asynchronously call
	// source's GenerateStream and returns after the source becomes ready.
	// GenerateStream could be called lazily to avoid unnecessary computation.
	// GenerateStream and Stop might also be called when this method returns
	// an error. The caller must not call GenerateStream or Stop of the source.
	AddSource(name string, s Source, config *SourceConfig) (SourceNode, error)

	// AddBox adds a Box to the topology. It returns BoxNode and the
	// caller can configure inputs or other settings of the Box node through it.
	//
	// Don't add the same instance of a Box more than once even if they have
	// different names. However, if a Box has a reference counter and
	// its initialization and termination are done exactly once at proper
	// timing, it can be added multiple times when a builder supports duplicated
	// registration of the same instance of a Box.
	AddBox(name string, b Box, config *BoxConfig) (BoxNode, error)

	// AddSink adds a Sink to the topology. It returns SinkNode and the
	// caller can configure inputs or other settings of the Sink node through it.
	AddSink(name string, s Sink, config *SinkConfig) (SinkNode, error)

	// Remove removes a node from the topology. It returns NotExistError when
	// the topology couldn't find a node having the name. The removed node is
	// stopped by the topology and Remove methods blocks until the node actually
	// stops.
	Remove(name string) error

	// Stop stops the topology. If the topology doesn't have a cycle, it stops
	// after all tuples generated from Sources at the time of the invocation
	// are written into Sinks. Stop method returns after processing all the
	// tuples.
	//
	// BUG: Currently Stop method doesn't work if the topology has a cycle.
	Stop() error

	// State returns the current state of the topology. The topology's state
	// isn't relevant to those nodes have.
	State() TopologyStateHolder

	// Node returns a node registered to the topology. It returns NotExistError
	// when the topology doesn't have the node.
	Node(name string) (Node, error)

	// Nodes returns all nodes registered to the topology. The map returned
	// from this method can safely be modified.
	Nodes() map[string]Node

	// Source returns a source registered to the topology. It returns
	// NotExistError when the topology doesn't have the source.
	Source(name string) (SourceNode, error)

	// Sources returns all sources registered to the topology. The map returned
	// from this method can safely be modified.
	Sources() map[string]SourceNode

	// Box returns a box registered to the topology. It returns NotExistError
	// when the topology doesn't have the box.
	Box(name string) (BoxNode, error)

	// Boxes returns all boxes registered to the topology. The map returned
	// from this method can safely be modified.
	Boxes() map[string]BoxNode

	// Sink returns a sink registered to the topology. It returns NotExistError
	// when the topology doesn't have the sink.
	Sink(name string) (SinkNode, error)

	// Sinks returns all sinks registered to the topology. The map returned
	// from this method can safely be modified.
	Sinks() map[string]SinkNode
}

Topology is a topology which can add Sources, Boxes, and Sinks dynamically. Boxes and Sinks can also add inputs dynamically from running Sources or Boxes.

func NewDefaultTopology

func NewDefaultTopology(ctx *Context, name string) (Topology, error)

NewDefaultTopology creates a topology having a simple graph structure.

type TopologyState

type TopologyState int

TopologyState represents a status of a topology or a node.

const (
	// TSInitialized means that a topology or a node is just initialized and
	// ready to be run.
	TSInitialized TopologyState = iota

	// TSStarting means a topology or a node is now booting itself and will run
	// shortly.
	TSStarting

	// TSRunning means a topology or a node is currently running and emitting
	// tuples to sinks.
	TSRunning

	// TSPaused means a topology or a node is temporarily stopping to emit
	// tuples and can be resumed later.
	TSPaused

	// TSStopping means a topology or a node is stopping all sources and closing
	// channels between sources, boxes, and sinks.
	TSStopping

	// TSStopped means a topology or a node is stopped. A stopped topology
	// doesn't have to be able to run again.
	TSStopped
)

func (TopologyState) String

func (s TopologyState) String() string

type TopologyStateHolder

type TopologyStateHolder interface {
	// Get returns the current state of a topology or a node.
	Get() TopologyState

	// Wait waits until the topology or the node has the specified state. It
	// returns the current state. The current state may differ from the given
	// state, but it's guaranteed that the current state is a successor of
	// the given state. For example, when Wait(TSStarting) is called, TSRunning
	// or TSStopped can be returned.
	Wait(s TopologyState) TopologyState
}

TopologyStateHolder is a struct safely referring a state of a topology or a node. It only provides read-only methods.

type TraceEvent

type TraceEvent struct {
	// Timestamp is the time of the event.
	Timestamp time.Time

	// Type represents the type of the event. For transitions, the viewpoint
	// of the Tuple should be assumed. For example, when a Tuple is emitted
	// by a Source, this is an OUTPUT transition; when it enters a Box for
	// processing, this is an INPUT transition. The OTHER Type can be used
	// to add other tracing information.
	Type EventType

	// Msg is any message, but for transitions it makes sense to use the
	// name of the Source/Box/Sink that was left/entered.
	Msg string
}

A TraceEvent represents an event in the processing lifecycle of a Tuple, in particular transitions from one processing unit to the next.

type Tuple

type Tuple struct {
	// Data is the actual data that is processed.
	Data data.Map

	// InputName can be used to identify the sender of a tuple when a
	// Box processes data from multiple inputs. It will be set before
	// Box.Process is called. Also see BoxDeclarer.NamedInput.
	InputName string

	// Timestamp is the time when this tuple was originally generated,
	// e.g., the timestamp of a camera image or a sensor-emitted value.
	// It is an integral part of the data and should not be changed.
	// It should be saved along with the Data when persisted to a data
	// store so that timestamp-based reprocessing can be done with the
	// same results at a later point in time.
	Timestamp time.Time

	// ProcTimestamp is the time when this tuple entered the topology
	// for processing. It should be set by the Source that emitted this
	// Tuple.
	ProcTimestamp time.Time

	// BatchID is reserved for future use.
	BatchID int64

	// Flags has bit flags which controls behavior of this tuple. When a Box
	// emits a tuple derived from a received one, it must copy this field
	// otherwise a problem like infinite reporting of a dropped tuple could
	// occur.
	Flags TupleFlags

	// Trace is used during debugging to trace to way of a Tuple through
	// a topology. See the documentation for TraceEvent.
	Trace []TraceEvent
}

Tuple is a fundamental data structure in SensorBee. All data that is processed is stored in tuples.

func NewTuple

func NewTuple(d data.Map) *Tuple

NewTuple creates and initializes a Tuple with default values. And it copied Data by argument. Timestamp and ProcTimestamp fields will be set time.Now() value.

func (*Tuple) AddEvent

func (t *Tuple) AddEvent(ev TraceEvent)

AddEvent adds a TraceEvent to this Tuple's trace. This is not thread-safe because it is assumed a Tuple is only processed by one unit at a time.

func (*Tuple) Copy

func (t *Tuple) Copy() *Tuple

Copy creates a deep copy of a Tuple, including the contained data. This can be used, e.g., by fan-out pipes. When Tuple.Data doesn't need to be cloned, call ShallowCopy. NEVER do newTuple := *oldTuple.

func (*Tuple) ShallowCopy

func (t *Tuple) ShallowCopy() *Tuple

ShallowCopy creates a new copy of a tuple. It only deep copies trace information. Because Data is shared between the old tuple and the new tuple, this method sets TFSharedData flag for both tuples. However, the tuple itself returned from this method isn't shared and its TFShared flag isn't set.

type TupleFlags

type TupleFlags uint32

TupleFlags has flags which controls behavior of a tuple.

const (
	// TFDropped is a flag which is set when a tuple is dropped. Once this flag
	// is set to a tuple, the tuple will not be reported when it is dropped.
	TFDropped TupleFlags = 1 << iota

	// TFShared is a flag which is set when a Tuple is shared by multiple nodes.
	// In other words, this flag is set if a tuple is referenced by multiple
	// pointers (i.e. *Tuple). This flag only indicates that Tuple struct itself
	// is shared. Tuple.Data might be shared even if this flag isn't set.
	//
	// To update a field of a tuple with TFShared flag, use ShallowCopy() or
	// Copy(). Also, if you keep a reference to a tuple or (parts of) its Data
	// after processing it, you must set the TFShared flag for that tuple.
	TFShared

	// TFSharedData is a flag which is set when Tuple.Data is shared by other
	// tuples. Tuple.Data must not directly modified if the flag is set.
	// To update Data of a tuple with TFSharedData flag, use Copy().
	//
	// Relations of TFShared and TFSharedData are summarized below:
	//
	//	(TFShared is set, TFSharedData is set):
	//	(true, true): *Tuple is copied
	//	(true, false): never happens
	//	(false, true): a tuple returned from ShallowCopy
	//	(false, false): a tuple returned from NewTuple or Copy
	TFSharedData
)

func (*TupleFlags) Clear

func (f *TupleFlags) Clear(v TupleFlags)

Clear clears a set of flags at once.

func (*TupleFlags) IsSet

func (f *TupleFlags) IsSet(v TupleFlags) bool

IsSet returns true if the all given flags are set.

func (*TupleFlags) Set

func (f *TupleFlags) Set(v TupleFlags)

Set sets a set of flags at once.

type Updater

type Updater interface {
	// Update updates the configuration parameters of this entity.
	// It is the updater's responsibility to check the validity
	// (e.g., data type and value) of the parameters.
	Update(ctx *Context, params data.Map) error
}

Updater represents an entity that can update its configuration parameters (in particular SourceNode, SinkNode and SharedState instances).

type WriteCloser

type WriteCloser interface {
	Writer

	// Close closes the writer. An appropriate Context should be given,
	// which is usually provided by Topology. Close doesn't have to be
	// idempotent.
	Close(ctx *Context) error
}

WriteCloser add a capability of closing to Writer.

type Writer

type Writer interface {
	Write(ctx *Context, t *Tuple) error
}

Writer describes an object that tuples can be written to as the output for a Box. Note that this interface was chosen because it also allows a Box to write multiple (or none) output tuples. It is expected that the ctx pointer passed in points to the same Context that was used by the Box that called Write.

func WriterFunc

func WriterFunc(f func(ctx *Context, t *Tuple) error) Writer

WriterFunc creates a Writer from a function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL