comp

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TypeENTRY    = "entry"
	TypePUBSUB   = "pubsub"
	TypeDISPATCH = "dispatch"
)
View Source
const (
	CtrlCall = iota + 10000
	CtrlCast
	RawByte
	Generic
)

public commands

View Source
const PubsubDefaultDeliveryTimeout = 20 * time.Millisecond
View Source
const SYS_NODE_SCOPE = "sys"

Variables

This section is empty.

Functions

func InitLogger

func InitLogger(gl *logrus.Logger)

func RegisterNodeFactory

func RegisterNodeFactory(typeName string, f SessionNodeFactory) error

func With

func With(args ...string) []string

func WithConnect

func WithConnect(toSession, toName string) []string

WithConnect dynamically ask the callee to set data pipe endpoint

func WithError

func WithError(args ...string) (r []string)

WithError return abnormal reply

func WithOk

func WithOk(args ...string) (r []string)

WithOk return normal reply

func WithString

func WithString(args string) (result []string, err error)

WithString parse a string into substrings separated by SPACE or TAB, double quote is required if substring contains space, and backslash is requited for escaped double quote, for example: cmd arg_a "arg with space", "arg with \" escaped \" quote"

Types

type Cloneable

type Cloneable interface {
	Clone() Cloneable
}

type CloneableMessage added in v0.0.3

type CloneableMessage interface {
	Message
	Clone() CloneableMessage
}

CloneableMessage is a must if message need to pass through pubsub node

type Composer

type Composer struct {
	// contains filtered or unexported fields
}

Composer creates every node and config their properties based on the collected info, then link them and send initial events as described before putting them into working state.

func NewSessionComposer

func NewSessionComposer(sessionId string) *Composer

func (*Composer) ComposeNodes

func (c *Composer) ComposeNodes(graph *event.Graph) (err error)

ComposeNodes create node instances by type, add them to graph, and link them

func (*Composer) ExitGraph

func (c *Composer) ExitGraph()

func (*Composer) GetController

func (c *Composer) GetController() Controller

func (*Composer) GetMessageProvider

func (c *Composer) GetMessageProvider(name string) MessageProvider

GetMessageProvider get entry by its name

func (*Composer) GetMessageProviderList

func (c *Composer) GetMessageProviderList() (mps []MessageProvider)

func (*Composer) GetSortedNodes

func (c *Composer) GetSortedNodes() (ni []*nmd.NodeDef)

func (*Composer) LinkChannel

func (c *Composer) LinkChannel(name string, ch chan<- *event.Event) error

func (*Composer) ParseGraphDescription

func (c *Composer) ParseGraphDescription(desc string) (err error)

func (*Composer) UnlinkChannel

func (c *Composer) UnlinkChannel(name string) error

type ComposerAware

type ComposerAware interface {
	// PreSetup called after composer parsed graph description, before creating node instances
	PreSetup(c *Composer) error

	// PostSetup called after composer created and setuped nodes
	PostSetup(c *Composer) error
}

ComposerAware enable class to interact with composer at each phase

type Controller

type Controller interface {
	// Call send message and wait for the response (block)
	Call(session, nodeName string, args []string) (resp []string)

	// Cast send message and don't wait (nonblock)
	Cast(session, nodeName string, args []string)

	// PushData only send bytes to node of the same session
	PushData(nodeName string, msgType string, data []byte)
}

Controller can be used by session commands and nodes in the event graph to invoke actions of other nodes. it provides a unified way to send control message to nodes without any knowledge of links (controller would establish links to all nodes to communicate beforehand), so simplify the programming pattern

type CtrlMessage

type CtrlMessage struct {
	M []string
	C chan []string // used to receive result if not nil
}

CtrlMessage is used to invoke or cast function call

func (*CtrlMessage) AsEvent added in v0.0.3

func (cm *CtrlMessage) AsEvent() *event.Event

type Dispatch

type Dispatch struct {
	SessionNode
	event.NodeProperty // necessary when the node number exceeds default maxLink of event graph
	// contains filtered or unexported fields
}

Dispatch is a bridge between every session's nodes and commands business commands normally change node's behaviour by send control messages to them with Call() or Cast(), nodes react to these messages which may cause more messages(change other node's behaviour,chain reaction) to send. all of these messages are sent from Dispatch. it is a SessionNode as well as implementing Controller interfaces, so it provides api to callers as well as sending messages to any other nodes in the event graph.

func (*Dispatch) Call

func (d *Dispatch) Call(session, nodeName string, args []string) (resp []string)

Call send control message to a node in the graph and wait for its reply if session is "", it means sending to local session nodes

func (*Dispatch) Cast

func (d *Dispatch) Cast(session, nodeName string, args []string)

Cast send control message to a node in the graph if session is "", it means send to local session nodes

func (*Dispatch) PushData added in v0.0.3

func (d *Dispatch) PushData(nodeName string, msgType string, data []byte)

type EntryNode

type EntryNode struct {
	SessionNode
	// contains filtered or unexported fields
}

EntryNode is a basic message provider that simply forward data message to event graph

func (*EntryNode) CanHandlePayloadType

func (e *EntryNode) CanHandlePayloadType(pt uint8) bool

func (*EntryNode) GetName

func (e *EntryNode) GetName() string

func (*EntryNode) Init added in v0.0.3

func (e *EntryNode) Init() error

func (*EntryNode) Priority

func (e *EntryNode) Priority() uint32

func (*EntryNode) PushMessage

func (e *EntryNode) PushMessage(msg Message) error

type GenericMessage added in v0.0.3

type GenericMessage struct {
	Subtype string
	Obj     interface{}
}

GenericMessage is used to encapsulate custom object in message with tagged type name nodes can only communicate with each other who can build/handle GenericMessage of the same tagged type

func (*GenericMessage) AsEvent added in v0.0.3

func (gm *GenericMessage) AsEvent() *event.Event

func (*GenericMessage) Clone added in v0.0.3

func (gm *GenericMessage) Clone() (obj CloneableMessage)

Clone returns non-nil object only if internal object is also cloneable or an array(slice) of cloneable

func (*GenericMessage) String added in v0.0.3

func (gm *GenericMessage) String() string

type Id

type Id struct {
	Name      string
	SessionId string
}

func NewId

func NewId(sessionId, name string) *Id

func (*Id) String

func (id *Id) String() string

type Message added in v0.0.3

type Message interface {
	AsEvent() *event.Event
}

Message is the base interface of all kinds of message

type MessageProvider

type MessageProvider interface {
	GetName() string
	PushMessage(data Message) error
	CanHandlePayloadType(pt uint8) bool
	Priority() uint32 // multiple message providers can be ordered by priority
}

MessageProvider can push data message to event-graph

type PubSubNode

type PubSubNode struct {
	SessionNode
	event.NodeProperty
	// contains filtered or unexported fields
}

func (*PubSubNode) OnEvent

func (p *PubSubNode) OnEvent(evt *event.Event)

func (*PubSubNode) OnExit

func (p *PubSubNode) OnExit()

OnExit close all channel subscribers

func (*PubSubNode) OnLinkDown

func (p *PubSubNode) OnLinkDown(_ int, scope string, nodeName string)

func (*PubSubNode) Publish

func (p *PubSubNode) Publish(obj CloneableMessage)

func (*PubSubNode) SetPipeOut

func (p *PubSubNode) SetPipeOut(session, name string) error

SetPipeOut overrides default session node's behaviour, it allows multiple pipes simultaneously (that's what "pubsub" stands for) instead of only one data output pipe

func (*PubSubNode) SubscribeChannel

func (p *PubSubNode) SubscribeChannel(chName string, c chan<- *event.Event) error

SubscribeChannel add a channel as a subscriber of this pubsub node

func (*PubSubNode) SubscribeNode

func (p *PubSubNode) SubscribeNode(scope, name string) error

SubscribeNode add a node as a subscriber of this pubsub node

func (*PubSubNode) UnsubscribeChannel

func (p *PubSubNode) UnsubscribeChannel(chName string) error

UnsubscribeChannel remove a channel subscriber with given name from this pubsub node

func (*PubSubNode) UnsubscribeNode

func (p *PubSubNode) UnsubscribeNode(scope, name string) error

UnsubscribeNode remove a node subscriber from this pubsub node

type RawByteMessage added in v0.0.3

type RawByteMessage []byte

RawByteMessage is used to pass byte streaming data between nodes intra/inter session, for efficiency

func NewRawByteMessage added in v0.0.3

func NewRawByteMessage(d string) RawByteMessage

func (RawByteMessage) AsEvent added in v0.0.3

func (m RawByteMessage) AsEvent() *event.Event

func (RawByteMessage) Clone added in v0.0.3

func (m RawByteMessage) Clone() CloneableMessage

func (RawByteMessage) String added in v0.0.3

func (m RawByteMessage) String() string

type SessionAware

type SessionAware interface {
	event.Node

	// ConfigProperties handles props that can not be configured by simple reflection
	ConfigProperties(ci []*nmd.NodeProp)

	// Init do initialization after node is allocated and configured
	Init() error

	// SetPipeOut specify the data endpoint to which this node output
	SetPipeOut(session, name string) error

	SetController(ctrl Controller)

	// ExitGraph is used when initialization failed or session terminated
	ExitGraph()
}

SessionAware enables node to: 1. config its static properties before any event flows 2. set data output destination 3. exit the graph when session ends

func MakeSessionNode

func MakeSessionNode(nodeType string, sessionId string, props []*nmd.NodeProp) SessionAware

MakeSessionNode factory method of all session aware nodes

type SessionNode

type SessionNode struct {
	Id
	// contains filtered or unexported fields
}

SessionNode is the base class of all nodes that provide capability in an RTP session

func (*SessionNode) Call

func (s *SessionNode) Call(session, name string, args []string) (resp []string)

Call forward to controller

func (*SessionNode) CallSys added in v0.0.3

func (s *SessionNode) CallSys(name string, args []string) (resp []string)

func (*SessionNode) Cast

func (s *SessionNode) Cast(session, name string, args []string)

Cast forward to controller

func (*SessionNode) CastSys added in v0.0.3

func (s *SessionNode) CastSys(name string, args []string)

func (*SessionNode) ConfigProperties

func (s *SessionNode) ConfigProperties(_ []*nmd.NodeProp)

func (*SessionNode) DataPipeReady

func (s *SessionNode) DataPipeReady() bool

DataPipeReady return whether data link is established

func (*SessionNode) ExitGraph

func (s *SessionNode) ExitGraph()

func (*SessionNode) GetNodeName

func (s *SessionNode) GetNodeName() string

func (*SessionNode) GetNodeScope

func (s *SessionNode) GetNodeScope() string

func (*SessionNode) Init

func (s *SessionNode) Init() error

func (*SessionNode) OnEnter

func (s *SessionNode) OnEnter(delegate *event.NodeDelegate)

func (*SessionNode) OnEvent

func (s *SessionNode) OnEvent(evt *event.Event)

func (*SessionNode) OnExit

func (s *SessionNode) OnExit()

func (*SessionNode) OnLinkDown

func (s *SessionNode) OnLinkDown(linkId int, scope string, nodeName string)

func (*SessionNode) SendEvent added in v0.0.3

func (s *SessionNode) SendEvent(evt *event.Event) (err error)

func (*SessionNode) SendMessage added in v0.0.3

func (s *SessionNode) SendMessage(msg Message) (err error)

SendMessage utility method to put data message to next node

func (*SessionNode) SetController

func (s *SessionNode) SetController(ctrl Controller)

func (*SessionNode) SetPipeOut

func (s *SessionNode) SetPipeOut(session, name string) error

type SessionNodeFactory

type SessionNodeFactory func() SessionAware

type SystemNode added in v0.0.3

type SystemNode struct {
	Id
	// contains filtered or unexported fields
}

SystemNode is the base class of all system-wide nodes that keep running throughout graph's lifetime

func (*SystemNode) GetNodeName added in v0.0.3

func (s *SystemNode) GetNodeName() string

func (*SystemNode) GetNodeScope added in v0.0.3

func (s *SystemNode) GetNodeScope() string

func (*SystemNode) OnEnter added in v0.0.3

func (s *SystemNode) OnEnter(delegate *event.NodeDelegate)

func (*SystemNode) OnEvent added in v0.0.3

func (s *SystemNode) OnEvent(evt *event.Event)

func (*SystemNode) OnExit added in v0.0.3

func (s *SystemNode) OnExit()

func (*SystemNode) OnLinkDown added in v0.0.3

func (s *SystemNode) OnLinkDown(linkId int, scope string, nodeName string)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL