flowgraph

package
v0.10.3-0...-304cdc7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CloseGracefully  bool = true
	CloseImmediately bool = false
)

Variables

This section is empty.

Functions

func NewNodeCtxManager

func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManager

NewNodeCtxManager init with the inputNode and fg.closeWg

Types

type BaseMsg

type BaseMsg struct {
	// contains filtered or unexported fields
}

func NewBaseMsg

func NewBaseMsg(isCloseMsg bool) BaseMsg

func (BaseMsg) IsCloseMsg

func (msg BaseMsg) IsCloseMsg() bool

type BaseNode

type BaseNode struct {
	// contains filtered or unexported fields
}

BaseNode defines some common node attributes and behavior

func (*BaseNode) Close

func (node *BaseNode) Close()

Close implementing Node, base node does nothing when stops

func (*BaseNode) IsInputNode

func (node *BaseNode) IsInputNode() bool

IsInputNode returns whether Node is InputNode, BaseNode is not InputNode by default

func (*BaseNode) IsValidInMsg

func (node *BaseNode) IsValidInMsg(in []Msg) bool

func (*BaseNode) MaxParallelism

func (node *BaseNode) MaxParallelism() int32

MaxParallelism returns the maximal parallelism

func (*BaseNode) MaxQueueLength

func (node *BaseNode) MaxQueueLength() int32

MaxQueueLength returns the maximal queue length

func (*BaseNode) Name

func (node *BaseNode) Name() string

func (*BaseNode) Operate

func (node *BaseNode) Operate(in []Msg) []Msg

func (*BaseNode) SetMaxParallelism

func (node *BaseNode) SetMaxParallelism(n int32)

SetMaxParallelism is used to set the maximal parallelism

func (*BaseNode) SetMaxQueueLength

func (node *BaseNode) SetMaxQueueLength(n int32)

SetMaxQueueLength is used to set the maximal queue length

func (*BaseNode) Start

func (node *BaseNode) Start()

Start implementing Node, base node does nothing when starts

type InputNode

type InputNode struct {
	BaseNode
	// contains filtered or unexported fields
}

InputNode is the entry point of flowgragh

func NewInputNode

func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLength int32, maxParallelism int32, role string, nodeID int64, collectionID int64, dataType string) *InputNode

NewInputNode composes an InputNode with provided input channel, name and parameters

func (*InputNode) IsInputNode

func (inNode *InputNode) IsInputNode() bool

IsInputNode returns whether Node is InputNode

func (*InputNode) IsValidInMsg

func (inNode *InputNode) IsValidInMsg(in []Msg) bool

func (*InputNode) Name

func (inNode *InputNode) Name() string

Name returns node name

func (*InputNode) Operate

func (inNode *InputNode) Operate(in []Msg) []Msg

Operate consume a message pack from msgstream and return

func (*InputNode) SetCloseMethod

func (inNode *InputNode) SetCloseMethod(gracefully bool)

type Msg

type Msg interface {
	TimeTick() Timestamp
	IsClose() bool
}

Msg is an abstract class that contains a method to get the time tick of this message

type MsgPosition

type MsgPosition = msgpb.MsgPosition

MsgPosition shortcut for msgpb.MsgPosition

type MsgStreamMsg

type MsgStreamMsg struct {
	BaseMsg
	// contains filtered or unexported fields
}

MsgStreamMsg is a wrapper of TsMsg in flowgraph

func GenerateMsgStreamMsg

func GenerateMsgStreamMsg(tsMessages []msgstream.TsMsg, timestampMin, timestampMax Timestamp, startPos []*MsgPosition, endPos []*MsgPosition) *MsgStreamMsg

GenerateMsgStreamMsg is used to create a new MsgStreamMsg object

func (*MsgStreamMsg) DownStreamNodeIdx

func (msMsg *MsgStreamMsg) DownStreamNodeIdx() int

DownStreamNodeIdx returns 0

func (*MsgStreamMsg) EndPositions

func (msMsg *MsgStreamMsg) EndPositions() []*MsgPosition

EndPositions returns the end position of TsMsgs

func (*MsgStreamMsg) IsClose

func (msMsg *MsgStreamMsg) IsClose() bool

func (*MsgStreamMsg) StartPositions

func (msMsg *MsgStreamMsg) StartPositions() []*MsgPosition

StartPositions returns the start position of TsMsgs

func (*MsgStreamMsg) TimeTick

func (msMsg *MsgStreamMsg) TimeTick() Timestamp

TimeTick returns the timetick of this message

func (*MsgStreamMsg) TimestampMax

func (msMsg *MsgStreamMsg) TimestampMax() Timestamp

TimestampMax returns the maximal timestamp in the TsMsg list

func (*MsgStreamMsg) TimestampMin

func (msMsg *MsgStreamMsg) TimestampMin() Timestamp

TimestampMin returns the minimal timestamp in the TsMsg list

func (*MsgStreamMsg) TsMessages

func (msMsg *MsgStreamMsg) TsMessages() []msgstream.TsMsg

TsMessages returns the origin TsMsg object list

type Node

type Node interface {
	Name() string
	MaxQueueLength() int32
	MaxParallelism() int32
	IsValidInMsg(in []Msg) bool
	Operate(in []Msg) []Msg
	IsInputNode() bool
	Start()
	Close()
}

Node is the interface defines the behavior of flowgraph

type NodeName

type NodeName = string

NodeName shortcut for string key

type TimeTickedFlowGraph

type TimeTickedFlowGraph struct {
	// contains filtered or unexported fields
}

TimeTickedFlowGraph flowgraph with input from tt msg stream

func NewTimeTickedFlowGraph

func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph

NewTimeTickedFlowGraph create timetick flowgraph

func (*TimeTickedFlowGraph) AddNode

func (fg *TimeTickedFlowGraph) AddNode(node Node)

AddNode add Node into flowgraph and fill nodeCtxManager

func (*TimeTickedFlowGraph) AssembleNodes

func (fg *TimeTickedFlowGraph) AssembleNodes(orderedNodes ...Node) error

func (*TimeTickedFlowGraph) Blockall

func (fg *TimeTickedFlowGraph) Blockall()

func (*TimeTickedFlowGraph) Close

func (fg *TimeTickedFlowGraph) Close()

Close closes all nodes in flowgraph

func (*TimeTickedFlowGraph) SetCloseMethod

func (fg *TimeTickedFlowGraph) SetCloseMethod(gracefully bool)

func (*TimeTickedFlowGraph) SetEdges

func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, out []string) error

SetEdges set directed edges from in nodes to out nodes

func (*TimeTickedFlowGraph) Start

func (fg *TimeTickedFlowGraph) Start()

Start starts all nodes in timetick flowgragh

func (*TimeTickedFlowGraph) Status

func (fg *TimeTickedFlowGraph) Status() string

Status returns the status of the pipeline, it will return "Healthy" if the input node has received any msg in the last nodeTtInterval

func (*TimeTickedFlowGraph) Unblock

func (fg *TimeTickedFlowGraph) Unblock()

type Timestamp

type Timestamp = typeutil.Timestamp

Timestamp shortcut for typeutil.Timestamp

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL