edge

package
v1.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: MIT Imports: 14 Imported by: 78

Documentation

Overview

Package edge provides mechanisms for message passing along edges. Several composable interfaces are defined to aid in implementing a node which consumes messages from an edge.

Index

Constants

This section is empty.

Variables

View Source
var ErrAborted = errors.New("edge aborted")

ErrAborted is returned from the Edge interface when operations are performed on the edge after it has been aborted.

Functions

func Forward

func Forward(outs []StatsEdge, msg Message) error

func NewDeleteGroupMessage added in v1.5.0

func NewDeleteGroupMessage(info GroupInfo) *deleteGroupMessage

Types

type BarrierMessage

type BarrierMessage interface {
	Message
	ShallowCopy() BarrierMessage
	GroupInfoer
	NameGetter
	DimensionGetter
	TagGetter
	TimeGetter
}

BarrierMessage indicates that no data older than the barrier time will arrive.

func NewBarrierMessage

func NewBarrierMessage(group GroupInfo, time time.Time) BarrierMessage

type BatchBuffer

type BatchBuffer struct {
	// contains filtered or unexported fields
}

BatchBuffer buffers batch messages into a BufferedBatchMessage.

func (*BatchBuffer) BatchPoint

func (r *BatchBuffer) BatchPoint(bp BatchPointMessage) error

func (*BatchBuffer) BeginBatch

func (r *BatchBuffer) BeginBatch(begin BeginBatchMessage) error

func (*BatchBuffer) BufferedBatchMessage

func (r *BatchBuffer) BufferedBatchMessage(end EndBatchMessage) BufferedBatchMessage

type BatchPointMessage

type BatchPointMessage interface {
	Message

	ShallowCopy() BatchPointMessage

	FieldsTagsTimeSetter
}

BatchPointMessage is a single point in a batch of data.

func BatchPointFromPoint

func BatchPointFromPoint(p PointMessage) BatchPointMessage

func NewBatchPointMessage

func NewBatchPointMessage(
	fields models.Fields,
	tags models.Tags,
	time time.Time,
) BatchPointMessage

type BatchPointMessages

type BatchPointMessages []BatchPointMessage

func (BatchPointMessages) Len

func (l BatchPointMessages) Len() int

func (BatchPointMessages) Less

func (l BatchPointMessages) Less(i int, j int) bool

func (BatchPointMessages) Swap

func (l BatchPointMessages) Swap(i int, j int)

type BeginBatchMessage

type BeginBatchMessage interface {
	Message

	ShallowCopy() BeginBatchMessage

	NameSetter

	GroupInfoer
	TagSetter
	DimensionSetter
	SetTagsAndDimensions(models.Tags, models.Dimensions)

	// Time is the maximum time of any point in the batch
	TimeSetter

	// SizeHint provides a hint about the size of the batch to come.
	// If non-zero expect a batch with SizeHint points,
	// otherwise an unknown number of points are coming.
	SizeHint() int
	SetSizeHint(int)
}

BeginBatchMessage marks the beginning of a batch of points. Once a BeginBatchMessage is received all subsequent message will be BatchPointMessages until an EndBatchMessage is received.

func NewBeginBatchMessage

func NewBeginBatchMessage(
	name string,
	tags models.Tags,
	byName bool,
	tmax time.Time,
	sizeHint int,
) BeginBatchMessage

type BufferedBatchMessage

type BufferedBatchMessage interface {
	Message

	ShallowCopy() BufferedBatchMessage

	Begin() BeginBatchMessage
	SetBegin(BeginBatchMessage)

	// Expose common read interfaces of begin and point messages.
	PointMeta

	Points() []BatchPointMessage
	SetPoints([]BatchPointMessage)

	End() EndBatchMessage
	SetEnd(EndBatchMessage)

	ToResult() models.Result
	ToRow() *models.Row
}

BufferedBatchMessage is a message containing all data for a single batch.

func NewBufferedBatchMessage

func NewBufferedBatchMessage(
	begin BeginBatchMessage,
	points []BatchPointMessage,
	end EndBatchMessage,
) BufferedBatchMessage

func ResultToBufferedBatches

func ResultToBufferedBatches(res influxdb.Result, groupByName bool) ([]BufferedBatchMessage, error)

type BufferedBatchMessageDecoder

type BufferedBatchMessageDecoder interface {
	Decode() (BufferedBatchMessage, error)
	More() bool
}

func NewBufferedBatchMessageDecoder

func NewBufferedBatchMessageDecoder(r io.Reader) BufferedBatchMessageDecoder

type BufferedReceiver

type BufferedReceiver interface {
	Receiver
	// BufferedBatch processes an entire buffered batch.
	// Do not modify the batch or the slice of Points as it is shared.
	BufferedBatch(batch BufferedBatchMessage) error
}

type Consumer

type Consumer interface {
	// Consume reads messages off an edge until the edge is closed or aborted.
	// An error is returned if either the edge or receiver errors.
	Consume() error
}

Consumer reads messages off an edge and passes them to a receiver.

func NewConsumerWithReceiver

func NewConsumerWithReceiver(e Edge, r Receiver) Consumer

NewConsumerWithReceiver creates a new consumer for the edge e and receiver r.

func NewMultiConsumer

func NewMultiConsumer(ins []Edge, r MultiReceiver) Consumer

func NewMultiConsumerWithStats

func NewMultiConsumerWithStats(ins []StatsEdge, r MultiReceiver) Consumer

type DeleteGroupMessage

type DeleteGroupMessage interface {
	Message
	GroupInfoer
}

type DimensionGetter

type DimensionGetter interface {
	Dimensions() models.Dimensions
}

type DimensionSetter

type DimensionSetter interface {
	DimensionGetter
	SetDimensions(models.Dimensions)
}

type Edge

type Edge interface {
	// Collect instructs the edge to accept a new message.
	Collect(Message) error
	// Emit blocks until a message is available and returns it or returns false if the edge has been closed or aborted.
	Emit() (Message, bool)
	// Close stops the edge, all messages currently buffered will be processed.
	// Future calls to Collect will panic.
	Close() error
	// Abort immediately stops the edge and all currently buffered messages are dropped.
	// Future calls to Collect return the error ErrAborted.
	Abort()
	// Type indicates whether the edge will emit stream or batch data.
	Type() pipeline.EdgeType
}

Edge represents the connection between two nodes that communicate via messages. Edge communication is unidirectional and asynchronous. Edges are safe for concurrent use.

func NewChannelEdge

func NewChannelEdge(typ pipeline.EdgeType, size int) Edge

NewChannelEdge returns a new edge that uses channels as the underlying transport.

type EndBatchMessage

type EndBatchMessage interface {
	Message

	ShallowCopy() EndBatchMessage
}

EndBatchMessage indicates that all points for a batch have arrived.

func NewEndBatchMessage

func NewEndBatchMessage() EndBatchMessage

type FieldGetter

type FieldGetter interface {
	Fields() models.Fields
}

type FieldSetter

type FieldSetter interface {
	FieldGetter
	SetFields(models.Fields)
}

type FieldsTagsTimeGetter

type FieldsTagsTimeGetter interface {
	FieldGetter
	TagGetter
	TimeGetter
}

type FieldsTagsTimeGetterMessage

type FieldsTagsTimeGetterMessage interface {
	Message
	FieldsTagsTimeGetter
}

type FieldsTagsTimeSetter

type FieldsTagsTimeSetter interface {
	FieldSetter
	TagSetter
	TimeSetter
}

type ForwardBufferedReceiver

type ForwardBufferedReceiver interface {
	ForwardReceiver
	BufferedBatch(batch BufferedBatchMessage) (Message, error)
}

ForwardBufferedReceiver handles messages as they arrive and can return a message to be forwarded to output edges. If a returned messages is nil, no message is forwarded.

type ForwardReceiver

type ForwardReceiver interface {
	BeginBatch(begin BeginBatchMessage) (Message, error)
	BatchPoint(bp BatchPointMessage) (Message, error)
	EndBatch(end EndBatchMessage) (Message, error)
	Point(p PointMessage) (Message, error)
	Barrier(b BarrierMessage) (Message, error)
	DeleteGroup(d DeleteGroupMessage) (Message, error)

	// Done is called once the receiver will no longer receive any messages.
	Done()
}

ForwardReceiver handles messages as they arrive and can return a message to be forwarded to output edges. If a returned messages is nil, no message is forwarded.

func NewTimedForwardReceiver

func NewTimedForwardReceiver(t timer.Timer, r ForwardReceiver) ForwardReceiver

NewTimedForwardReceiver creates a forward receiver which times the time spent in r.

type GroupIDGetter

type GroupIDGetter interface {
	GroupID() models.GroupID
}

type GroupInfo

type GroupInfo struct {
	ID         models.GroupID
	Tags       models.Tags
	Dimensions models.Dimensions
}

GroupInfo identifies and contians information about a specific group.

type GroupInfoer

type GroupInfoer interface {
	GroupIDGetter
	GroupInfo() GroupInfo
}

type GroupStats

type GroupStats struct {
	GroupInfo GroupInfo
	Collected int64
	Emitted   int64
}

GroupStats represents the statistics for a specific group.

type GroupedConsumer

type GroupedConsumer interface {
	Consumer
	// CardinalityVar is an exported var that indicates the current number of groups being managed.
	CardinalityVar() expvar.IntVar
}

GroupedConsumer reads messages off an edge and passes them by group to receivers created from a grouped receiver.

func NewGroupedConsumer

func NewGroupedConsumer(e Edge, r GroupedReceiver) GroupedConsumer

NewGroupedConsumer creates a new grouped consumer for edge e and grouped receiver r.

type GroupedReceiver

type GroupedReceiver interface {
	// NewGroup signals that a new group has been discovered in the data.
	// Information on the group and the message that first triggered its creation are provided.
	NewGroup(group GroupInfo, first PointMeta) (Receiver, error)
}

GroupedReceiver creates and deletes receivers as groups are created and deleted.

type Message

type Message interface {
	// Type returns the type of the message.
	Type() MessageType
}

Message represents data to be passed along an edge. Messages can be shared across many contexts.

All messages implement their own ShallowCopy method. All ShallowCopy methods create a copy of the message but does not deeply copy any reference types.

Never mutate a reference type returned from a message without first directly copying the reference type.

type MessageType

type MessageType int
const (
	BeginBatch MessageType = iota
	BatchPoint
	EndBatch
	BufferedBatch
	Point
	Barrier
	DeleteGroup
)

func (MessageType) String

func (m MessageType) String() string

type MultiReceiver

type MultiReceiver interface {
	BufferedBatch(src int, batch BufferedBatchMessage) error
	Point(src int, p PointMessage) error
	Barrier(src int, b BarrierMessage) error
	Delete(src int, d DeleteGroupMessage) error
	Finish() error
}

type NameGetter

type NameGetter interface {
	Name() string
}

type NameSetter

type NameSetter interface {
	NameGetter
	SetName(string)
}

type PointMessage

type PointMessage interface {
	Message

	ShallowCopy() PointMessage

	NameSetter

	Database() string
	SetDatabase(string)
	RetentionPolicy() string
	SetRetentionPolicy(string)

	GroupInfoer

	DimensionSetter
	SetTagsAndDimensions(models.Tags, models.Dimensions)

	FieldsTagsTimeSetter

	Bytes(precision string) []byte

	ToResult() models.Result
	ToRow() *models.Row
}

PointMessage is a single point.

func NewPointMessage

func NewPointMessage(
	name,
	database,
	retentionPolicy string,
	dimensions models.Dimensions,
	fields models.Fields,
	tags models.Tags,
	time time.Time) PointMessage

type PointMeta

PointMeta is the common read interfaces of point and batch messages.

type Receiver

type Receiver interface {
	BeginBatch(begin BeginBatchMessage) error
	BatchPoint(bp BatchPointMessage) error
	EndBatch(end EndBatchMessage) error
	Point(p PointMessage) error
	Barrier(b BarrierMessage) error
	DeleteGroup(d DeleteGroupMessage) error

	// Done is called once the receiver will no longer receive any messages.
	Done()
}

Receiver handles messages as they arrive via a consumer.

func NewReceiverFromForwardReceiver

func NewReceiverFromForwardReceiver(outs []Edge, r ForwardReceiver) Receiver

NewReceiverFromForwardReceiver creates a new receiver from the provided list of edges and forward receiver.

func NewReceiverFromForwardReceiverWithStats

func NewReceiverFromForwardReceiverWithStats(outs []StatsEdge, r ForwardReceiver) Receiver

NewReceiverFromForwardReceiverWithStats creates a new receiver from the provided list of stats edges and forward receiver.

type StatsEdge

type StatsEdge interface {
	Edge
	// Collected returns the number of messages collected by this edge.
	Collected() int64
	// Emitted returns the number of messages emitted by this edge.
	Emitted() int64
	// CollectedVar is an exported var the represents the number of messages collected by this edge.
	CollectedVar() expvar.IntVar
	// EmittedVar is an exported var the represents the number of messages emitted by this edge.
	EmittedVar() expvar.IntVar
	// ReadGroupStats allows for the reading of the current statistics by group.
	ReadGroupStats(func(*GroupStats))
}

StatsEdge is an edge that tracks various statistics about message passing through the edge.

func NewStatsEdge

func NewStatsEdge(e Edge) StatsEdge

NewStatsEdge creates an edge that tracks statistics about the message passing through the edge.

type TagGetter

type TagGetter interface {
	Tags() models.Tags
}

type TagSetter

type TagSetter interface {
	TagGetter
	SetTags(models.Tags)
}

type TimeGetter

type TimeGetter interface {
	Time() time.Time
}

type TimeSetter

type TimeSetter interface {
	TimeGetter
	SetTime(time.Time)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL