Documentation ¶
Index ¶
- Constants
- func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManager
- type BaseMsg
- type BaseNode
- func (node *BaseNode) Close()
- func (node *BaseNode) IsInputNode() bool
- func (node *BaseNode) IsValidInMsg(in []Msg) bool
- func (node *BaseNode) MaxParallelism() int32
- func (node *BaseNode) MaxQueueLength() int32
- func (node *BaseNode) Name() string
- func (node *BaseNode) Operate(in []Msg) []Msg
- func (node *BaseNode) SetMaxParallelism(n int32)
- func (node *BaseNode) SetMaxQueueLength(n int32)
- func (node *BaseNode) Start()
- type InputNode
- type Msg
- type MsgPosition
- type MsgStreamMsg
- func (msMsg *MsgStreamMsg) DownStreamNodeIdx() int
- func (msMsg *MsgStreamMsg) EndPositions() []*MsgPosition
- func (msMsg *MsgStreamMsg) IsClose() bool
- func (msMsg *MsgStreamMsg) StartPositions() []*MsgPosition
- func (msMsg *MsgStreamMsg) TimeTick() Timestamp
- func (msMsg *MsgStreamMsg) TimestampMax() Timestamp
- func (msMsg *MsgStreamMsg) TimestampMin() Timestamp
- func (msMsg *MsgStreamMsg) TsMessages() []msgstream.TsMsg
- type Node
- type NodeName
- type TimeTickedFlowGraph
- func (fg *TimeTickedFlowGraph) AddNode(node Node)
- func (fg *TimeTickedFlowGraph) AssembleNodes(orderedNodes ...Node) error
- func (fg *TimeTickedFlowGraph) Blockall()
- func (fg *TimeTickedFlowGraph) Close()
- func (fg *TimeTickedFlowGraph) SetCloseMethod(gracefully bool)
- func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, out []string) error
- func (fg *TimeTickedFlowGraph) Start()
- func (fg *TimeTickedFlowGraph) Status() string
- func (fg *TimeTickedFlowGraph) Unblock()
- type Timestamp
Constants ¶
const ( CloseGracefully bool = true CloseImmediately bool = false )
Variables ¶
This section is empty.
Functions ¶
func NewNodeCtxManager ¶
NewNodeCtxManager init with the inputNode and fg.closeWg
Types ¶
type BaseMsg ¶
type BaseMsg struct {
// contains filtered or unexported fields
}
func NewBaseMsg ¶
func (BaseMsg) IsCloseMsg ¶
type BaseNode ¶
type BaseNode struct {
// contains filtered or unexported fields
}
BaseNode defines some common node attributes and behavior
func (*BaseNode) Close ¶
func (node *BaseNode) Close()
Close implementing Node, base node does nothing when stops
func (*BaseNode) IsInputNode ¶
IsInputNode returns whether Node is InputNode, BaseNode is not InputNode by default
func (*BaseNode) IsValidInMsg ¶
func (*BaseNode) MaxParallelism ¶
MaxParallelism returns the maximal parallelism
func (*BaseNode) MaxQueueLength ¶
MaxQueueLength returns the maximal queue length
func (*BaseNode) SetMaxParallelism ¶
SetMaxParallelism is used to set the maximal parallelism
func (*BaseNode) SetMaxQueueLength ¶
SetMaxQueueLength is used to set the maximal queue length
type InputNode ¶
type InputNode struct { BaseNode // contains filtered or unexported fields }
InputNode is the entry point of flowgragh
func NewInputNode ¶
func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLength int32, maxParallelism int32, role string, nodeID int64, collectionID int64, dataType string) *InputNode
NewInputNode composes an InputNode with provided input channel, name and parameters
func (*InputNode) IsInputNode ¶
IsInputNode returns whether Node is InputNode
func (*InputNode) IsValidInMsg ¶
func (*InputNode) SetCloseMethod ¶
type MsgStreamMsg ¶
type MsgStreamMsg struct { BaseMsg // contains filtered or unexported fields }
MsgStreamMsg is a wrapper of TsMsg in flowgraph
func GenerateMsgStreamMsg ¶
func GenerateMsgStreamMsg(tsMessages []msgstream.TsMsg, timestampMin, timestampMax Timestamp, startPos []*MsgPosition, endPos []*MsgPosition) *MsgStreamMsg
GenerateMsgStreamMsg is used to create a new MsgStreamMsg object
func (*MsgStreamMsg) DownStreamNodeIdx ¶
func (msMsg *MsgStreamMsg) DownStreamNodeIdx() int
DownStreamNodeIdx returns 0
func (*MsgStreamMsg) EndPositions ¶
func (msMsg *MsgStreamMsg) EndPositions() []*MsgPosition
EndPositions returns the end position of TsMsgs
func (*MsgStreamMsg) IsClose ¶
func (msMsg *MsgStreamMsg) IsClose() bool
func (*MsgStreamMsg) StartPositions ¶
func (msMsg *MsgStreamMsg) StartPositions() []*MsgPosition
StartPositions returns the start position of TsMsgs
func (*MsgStreamMsg) TimeTick ¶
func (msMsg *MsgStreamMsg) TimeTick() Timestamp
TimeTick returns the timetick of this message
func (*MsgStreamMsg) TimestampMax ¶
func (msMsg *MsgStreamMsg) TimestampMax() Timestamp
TimestampMax returns the maximal timestamp in the TsMsg list
func (*MsgStreamMsg) TimestampMin ¶
func (msMsg *MsgStreamMsg) TimestampMin() Timestamp
TimestampMin returns the minimal timestamp in the TsMsg list
func (*MsgStreamMsg) TsMessages ¶
func (msMsg *MsgStreamMsg) TsMessages() []msgstream.TsMsg
TsMessages returns the origin TsMsg object list
type Node ¶
type Node interface { Name() string MaxQueueLength() int32 MaxParallelism() int32 IsValidInMsg(in []Msg) bool Operate(in []Msg) []Msg IsInputNode() bool Start() Close() }
Node is the interface defines the behavior of flowgraph
type TimeTickedFlowGraph ¶
type TimeTickedFlowGraph struct {
// contains filtered or unexported fields
}
TimeTickedFlowGraph flowgraph with input from tt msg stream
func NewTimeTickedFlowGraph ¶
func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph
NewTimeTickedFlowGraph create timetick flowgraph
func (*TimeTickedFlowGraph) AddNode ¶
func (fg *TimeTickedFlowGraph) AddNode(node Node)
AddNode add Node into flowgraph and fill nodeCtxManager
func (*TimeTickedFlowGraph) AssembleNodes ¶
func (fg *TimeTickedFlowGraph) AssembleNodes(orderedNodes ...Node) error
func (*TimeTickedFlowGraph) Blockall ¶
func (fg *TimeTickedFlowGraph) Blockall()
func (*TimeTickedFlowGraph) Close ¶
func (fg *TimeTickedFlowGraph) Close()
Close closes all nodes in flowgraph
func (*TimeTickedFlowGraph) SetCloseMethod ¶
func (fg *TimeTickedFlowGraph) SetCloseMethod(gracefully bool)
func (*TimeTickedFlowGraph) SetEdges ¶
func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, out []string) error
SetEdges set directed edges from in nodes to out nodes
func (*TimeTickedFlowGraph) Start ¶
func (fg *TimeTickedFlowGraph) Start()
Start starts all nodes in timetick flowgragh
func (*TimeTickedFlowGraph) Status ¶
func (fg *TimeTickedFlowGraph) Status() string
Status returns the status of the pipeline, it will return "Healthy" if the input node has received any msg in the last nodeTtInterval
func (*TimeTickedFlowGraph) Unblock ¶
func (fg *TimeTickedFlowGraph) Unblock()