Documentation ¶
Overview ¶
Package edge provides mechanisms for message passing along edges. Several composable interfaces are defined to aid in implementing a node which consumes messages from an edge.
Index ¶
- Variables
- func Forward(outs []StatsEdge, msg Message) error
- type BarrierMessage
- type BatchBuffer
- type BatchPointMessage
- type BatchPointMessages
- type BeginBatchMessage
- type BufferedBatchMessage
- type BufferedBatchMessageDecoder
- type BufferedReceiver
- type Consumer
- type DeleteGroupMessage
- type DimensionGetter
- type DimensionSetter
- type Edge
- type EndBatchMessage
- type FieldGetter
- type FieldSetter
- type FieldsTagsTimeGetter
- type FieldsTagsTimeGetterMessage
- type FieldsTagsTimeSetter
- type ForwardBufferedReceiver
- type ForwardReceiver
- type GroupIDGetter
- type GroupInfo
- type GroupInfoer
- type GroupStats
- type GroupedConsumer
- type GroupedReceiver
- type Message
- type MessageType
- type MultiReceiver
- type NameGetter
- type NameSetter
- type PointMessage
- type PointMeta
- type Receiver
- type StatsEdge
- type TagGetter
- type TagSetter
- type TimeGetter
- type TimeSetter
Constants ¶
This section is empty.
Variables ¶
var ErrAborted = errors.New("edge aborted")
ErrAborted is returned from the Edge interface when operations are performed on the edge after it has been aborted.
Functions ¶
Types ¶
type BarrierMessage ¶
type BarrierMessage interface { Message ShallowCopy() BarrierMessage TimeSetter }
BarrierMessage indicates that no data older than the barrier time will arrive.
func NewBarrierMessage ¶
func NewBarrierMessage(time time.Time) BarrierMessage
type BatchBuffer ¶
type BatchBuffer struct {
// contains filtered or unexported fields
}
BatchBuffer buffers batch messages into a BufferedBatchMessage.
func (*BatchBuffer) BatchPoint ¶
func (r *BatchBuffer) BatchPoint(bp BatchPointMessage) error
func (*BatchBuffer) BeginBatch ¶
func (r *BatchBuffer) BeginBatch(begin BeginBatchMessage) error
func (*BatchBuffer) BufferedBatchMessage ¶
func (r *BatchBuffer) BufferedBatchMessage(end EndBatchMessage) BufferedBatchMessage
type BatchPointMessage ¶
type BatchPointMessage interface { Message ShallowCopy() BatchPointMessage FieldsTagsTimeSetter }
BatchPointMessage is a single point in a batch of data.
func BatchPointFromPoint ¶
func BatchPointFromPoint(p PointMessage) BatchPointMessage
func NewBatchPointMessage ¶
type BatchPointMessages ¶
type BatchPointMessages []BatchPointMessage
func (BatchPointMessages) Len ¶
func (l BatchPointMessages) Len() int
func (BatchPointMessages) Swap ¶
func (l BatchPointMessages) Swap(i int, j int)
type BeginBatchMessage ¶
type BeginBatchMessage interface { Message ShallowCopy() BeginBatchMessage NameSetter GroupInfoer TagSetter DimensionSetter SetTagsAndDimensions(models.Tags, models.Dimensions) // Time is the maximum time of any point in the batch TimeSetter // SizeHint provides a hint about the size of the batch to come. // If non-zero expect a batch with SizeHint points, // otherwise an unknown number of points are coming. SizeHint() int SetSizeHint(int) }
BeginBatchMessage marks the beginning of a batch of points. Once a BeginBatchMessage is received all subsequent message will be BatchPointMessages until an EndBatchMessage is received.
func NewBeginBatchMessage ¶
type BufferedBatchMessage ¶
type BufferedBatchMessage interface { Message ShallowCopy() BufferedBatchMessage Begin() BeginBatchMessage SetBegin(BeginBatchMessage) // Expose common read interfaces of begin and point messages. PointMeta Points() []BatchPointMessage SetPoints([]BatchPointMessage) End() EndBatchMessage SetEnd(EndBatchMessage) ToResult() models.Result ToRow() *models.Row }
BufferedBatchMessage is a message containing all data for a single batch.
func NewBufferedBatchMessage ¶
func NewBufferedBatchMessage( begin BeginBatchMessage, points []BatchPointMessage, end EndBatchMessage, ) BufferedBatchMessage
func ResultToBufferedBatches ¶
func ResultToBufferedBatches(res influxdb.Result, groupByName bool) ([]BufferedBatchMessage, error)
type BufferedBatchMessageDecoder ¶
type BufferedBatchMessageDecoder interface { Decode() (BufferedBatchMessage, error) More() bool }
func NewBufferedBatchMessageDecoder ¶
func NewBufferedBatchMessageDecoder(r io.Reader) BufferedBatchMessageDecoder
type BufferedReceiver ¶
type BufferedReceiver interface { Receiver // BufferedBatch processes an entire buffered batch. // Do not modify the batch or the slice of Points as it is shared. BufferedBatch(batch BufferedBatchMessage) error }
type Consumer ¶
type Consumer interface { // Consume reads messages off an edge until the edge is closed or aborted. // An error is returned if either the edge or receiver errors. Consume() error }
Consumer reads messages off an edge and passes them to a receiver.
func NewConsumerWithReceiver ¶
NewConsumerWithReceiver creates a new consumer for the edge e and receiver r.
func NewMultiConsumer ¶
func NewMultiConsumer(ins []Edge, r MultiReceiver) Consumer
func NewMultiConsumerWithStats ¶
func NewMultiConsumerWithStats(ins []StatsEdge, r MultiReceiver) Consumer
type DeleteGroupMessage ¶
type DeleteGroupMessage interface { Message GroupIDGetter }
type DimensionGetter ¶
type DimensionGetter interface {
Dimensions() models.Dimensions
}
type DimensionSetter ¶
type DimensionSetter interface { DimensionGetter SetDimensions(models.Dimensions) }
type Edge ¶
type Edge interface { // Collect instructs the edge to accept a new message. Collect(Message) error // Emit blocks until a message is available and returns it or returns false if the edge has been closed or aborted. Emit() (Message, bool) // Close stops the edge, all messages currently buffered will be processed. // Future calls to Collect will panic. Close() error // Abort immediately stops the edge and all currently buffered messages are dropped. // Future calls to Collect return the error ErrAborted. Abort() // Type indicates whether the edge will emit stream or batch data. Type() pipeline.EdgeType }
Edge represents the connection between two nodes that communicate via messages. Edge communication is unidirectional and asynchronous. Edges are safe for concurrent use.
type EndBatchMessage ¶
type EndBatchMessage interface { Message ShallowCopy() EndBatchMessage }
EndBatchMessage indicates that all points for a batch have arrived.
func NewEndBatchMessage ¶
func NewEndBatchMessage() EndBatchMessage
type FieldGetter ¶
type FieldSetter ¶
type FieldSetter interface { FieldGetter SetFields(models.Fields) }
type FieldsTagsTimeGetter ¶
type FieldsTagsTimeGetter interface { FieldGetter TagGetter TimeGetter }
type FieldsTagsTimeGetterMessage ¶
type FieldsTagsTimeGetterMessage interface { Message FieldsTagsTimeGetter }
type FieldsTagsTimeSetter ¶
type FieldsTagsTimeSetter interface { FieldSetter TagSetter TimeSetter }
type ForwardBufferedReceiver ¶
type ForwardBufferedReceiver interface { ForwardReceiver BufferedBatch(batch BufferedBatchMessage) (Message, error) }
ForwardBufferedReceiver handles messages as they arrive and can return a message to be forwarded to output edges. If a returned messages is nil, no message is forwarded.
type ForwardReceiver ¶
type ForwardReceiver interface { BeginBatch(begin BeginBatchMessage) (Message, error) BatchPoint(bp BatchPointMessage) (Message, error) EndBatch(end EndBatchMessage) (Message, error) Point(p PointMessage) (Message, error) Barrier(b BarrierMessage) (Message, error) DeleteGroup(d DeleteGroupMessage) (Message, error) }
ForwardReceiver handles messages as they arrive and can return a message to be forwarded to output edges. If a returned messages is nil, no message is forwarded.
func NewTimedForwardReceiver ¶
func NewTimedForwardReceiver(t timer.Timer, r ForwardReceiver) ForwardReceiver
NewTimedForwardReceiver creates a forward receiver which times the time spent in r.
type GroupIDGetter ¶
type GroupInfoer ¶
type GroupInfoer interface { GroupIDGetter GroupInfo() GroupInfo }
type GroupStats ¶
GroupStats represents the statistics for a specific group.
type GroupedConsumer ¶
type GroupedConsumer interface { Consumer // CardinalityVar is an exported var that indicates the current number of groups being managed. CardinalityVar() expvar.IntVar }
GroupedConsumer reads messages off an edge and passes them by group to receivers created from a grouped receiver.
func NewGroupedConsumer ¶
func NewGroupedConsumer(e Edge, r GroupedReceiver) GroupedConsumer
NewGroupedConsumer creates a new grouped consumer for edge e and grouped receiver r.
type GroupedReceiver ¶
type GroupedReceiver interface { // NewGroup signals that a new group has been discovered in the data. // Information on the group and the message that first triggered its creation are provided. NewGroup(group GroupInfo, first PointMeta) (Receiver, error) }
GroupedReceiver creates and deletes receivers as groups are created and deleted.
type Message ¶
type Message interface { // Type returns the type of the message. Type() MessageType }
Message represents data to be passed along an edge. Messages can be shared across many contexts.
All messages implement their own ShallowCopy method. All ShallowCopy methods create a copy of the message but does not deeply copy any reference types.
Never mutate a reference type returned from a message without first directly copying the reference type.
type MessageType ¶
type MessageType int
const ( BeginBatch MessageType = iota BatchPoint EndBatch BufferedBatch Point Barrier DeleteGroup )
func (MessageType) String ¶
func (m MessageType) String() string
type MultiReceiver ¶
type MultiReceiver interface { BufferedBatch(src int, batch BufferedBatchMessage) error Point(src int, p PointMessage) error Barrier(src int, b BarrierMessage) error Finish() error }
type NameGetter ¶
type NameGetter interface {
Name() string
}
type NameSetter ¶
type NameSetter interface { NameGetter SetName(string) }
type PointMessage ¶
type PointMessage interface { Message ShallowCopy() PointMessage NameSetter Database() string SetDatabase(string) RetentionPolicy() string SetRetentionPolicy(string) GroupInfoer DimensionSetter SetTagsAndDimensions(models.Tags, models.Dimensions) FieldsTagsTimeSetter Bytes(precision string) []byte ToResult() models.Result ToRow() *models.Row }
PointMessage is a single point.
func NewPointMessage ¶
func NewPointMessage( name, database, retentionPolicy string, dimensions models.Dimensions, fields models.Fields, tags models.Tags, time time.Time) PointMessage
type PointMeta ¶
type PointMeta interface { NameGetter GroupInfoer DimensionGetter TagGetter TimeGetter }
PointMeta is the common read interfaces of point and batch messages.
type Receiver ¶
type Receiver interface { BeginBatch(begin BeginBatchMessage) error BatchPoint(bp BatchPointMessage) error EndBatch(end EndBatchMessage) error Point(p PointMessage) error Barrier(b BarrierMessage) error DeleteGroup(d DeleteGroupMessage) error }
Receiver handles messages as they arrive via a consumer.
func NewReceiverFromForwardReceiver ¶
func NewReceiverFromForwardReceiver(outs []Edge, r ForwardReceiver) Receiver
NewReceiverFromForwardReceiver creates a new receiver from the provided list of edges and forward receiver.
func NewReceiverFromForwardReceiverWithStats ¶
func NewReceiverFromForwardReceiverWithStats(outs []StatsEdge, r ForwardReceiver) Receiver
NewReceiverFromForwardReceiverWithStats creates a new receiver from the provided list of stats edges and forward receiver.
type StatsEdge ¶
type StatsEdge interface { Edge // Collected returns the number of messages collected by this edge. Collected() int64 // Emitted returns the number of messages emitted by this edge. Emitted() int64 // CollectedVar is an exported var the represents the number of messages collected by this edge. CollectedVar() expvar.IntVar // EmittedVar is an exported var the represents the number of messages emitted by this edge. EmittedVar() expvar.IntVar // ReadGroupStats allows for the reading of the current statistics by group. ReadGroupStats(func(*GroupStats)) }
StatsEdge is an edge that tracks various statistics about message passing through the edge.
func NewStatsEdge ¶
NewStatsEdge creates an edge that tracks statistics about the message passing through the edge.
type TimeGetter ¶
type TimeSetter ¶
type TimeSetter interface { TimeGetter SetTime(time.Time) }