runtime

package
v0.23.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package runtime implements environment for dataflow programs execution.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStartPortNotFound = errors.New("start port not found")
	ErrExitPortNotFound  = errors.New("stop port not found")
	ErrConnector         = errors.New("connector")
	ErrFuncRunner        = errors.New("func runner")
)
View Source
var ErrNilDeps = errors.New("runtime deps nil")
View Source
var ErrSinglePortCount = errors.New("number of ports found by name not equals to one")

Functions

This section is empty.

Types

type BoolMsg

type BoolMsg struct {
	// contains filtered or unexported fields
}

func NewBoolMsg

func NewBoolMsg(b bool) BoolMsg

func (BoolMsg) Bool

func (msg BoolMsg) Bool() bool

func (BoolMsg) Float

func (BoolMsg) Float() float64

func (BoolMsg) Int

func (BoolMsg) Int() int64

func (BoolMsg) List

func (BoolMsg) List() []Msg

func (BoolMsg) Map

func (BoolMsg) Map() map[string]Msg

func (BoolMsg) MarshalJSON added in v0.20.0

func (msg BoolMsg) MarshalJSON() ([]byte, error)

func (BoolMsg) Str

func (BoolMsg) Str() string

func (BoolMsg) String

func (msg BoolMsg) String() string

func (BoolMsg) Type

func (msg BoolMsg) Type() MsgType

type Connection

type Connection struct {
	Sender    chan Msg
	Receivers []chan Msg
	Meta      ConnectionMeta
}

type ConnectionMeta

type ConnectionMeta struct {
	SenderPortAddr    PortAddr
	ReceiverPortAddrs []PortAddr
}

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func NewConnector added in v0.6.3

func NewConnector(lis EventListener) Connector

func NewDefaultConnector

func NewDefaultConnector() Connector

func (Connector) Connect

func (c Connector) Connect(ctx context.Context, conns []Connection)

type EmptyListener

type EmptyListener struct{}

func (EmptyListener) Send

func (l EmptyListener) Send(_ Event, msg Msg) Msg

type Event

type Event struct {
	Type            EventType
	MessageSent     *EventMessageSent
	MessagePending  *EventMessagePending
	MessageReceived *EventMessageReceived
}

func (Event) String

func (e Event) String() string

type EventListener

type EventListener interface {
	Send(event Event, msg Msg) Msg
}

type EventMessagePending

type EventMessagePending struct {
	Meta             ConnectionMeta // We can use sender from here and receivers just as a handy metadata
	ReceiverPortAddr PortAddr       // So what we really need is sender and receiver port addrs
}

EventMessagePending describes event when message has reached receiver but not yet passed inside. It's usefull only for interception and modifying message for specific receiver.

func (EventMessagePending) String

func (e EventMessagePending) String() string

type EventMessageReceived

type EventMessageReceived struct {
	Meta             ConnectionMeta // Same as with pending event
	ReceiverPortAddr PortAddr
}

func (EventMessageReceived) String

func (e EventMessageReceived) String() string

type EventMessageSent

type EventMessageSent struct {
	SenderPortAddr    PortAddr
	ReceiverPortAddrs map[PortAddr]struct{} // We use map to work with breakpoints
}

func (EventMessageSent) String

func (e EventMessageSent) String() string

type EventType

type EventType uint8
const (
	MessageSentEvent     EventType = 1 // Message is sent from sender to its receivers
	MessagePendingEvent  EventType = 2 // Message has reached receiver but not yet passed inside
	MessageReceivedEvent EventType = 3 // Message is passed inside receiver
)

func (EventType) String

func (e EventType) String() string

type FloatMsg

type FloatMsg struct {
	// contains filtered or unexported fields
}

func NewFloatMsg

func NewFloatMsg(n float64) FloatMsg

func (FloatMsg) Bool

func (FloatMsg) Bool() bool

func (FloatMsg) Float

func (msg FloatMsg) Float() float64

func (FloatMsg) Int

func (FloatMsg) Int() int64

func (FloatMsg) List

func (FloatMsg) List() []Msg

func (FloatMsg) Map

func (FloatMsg) Map() map[string]Msg

func (FloatMsg) MarshalJSON added in v0.20.0

func (msg FloatMsg) MarshalJSON() ([]byte, error)

func (FloatMsg) Str

func (FloatMsg) Str() string

func (FloatMsg) String

func (msg FloatMsg) String() string

func (FloatMsg) Type

func (msg FloatMsg) Type() MsgType

type FuncCall

type FuncCall struct {
	Ref       string
	IO        FuncIO
	ConfigMsg Msg
}

type FuncCreator

type FuncCreator interface {
	// Create method validates the input and builds ready to use function
	Create(funcIO FuncIO, msg Msg) (func(context.Context), error)
}

type FuncIO

type FuncIO struct {
	In, Out FuncPorts
}

type FuncPorts

type FuncPorts map[string][]chan Msg

FuncPorts is data structure that runtime functions must use at startup to get needed ports. Its methods can return error because it's okay to fail at startup. Keys are port names and values are slots.

func (FuncPorts) Port

func (f FuncPorts) Port(name string) (chan Msg, error)

Port returns first slot of port found by the given name. It returns error if port is not found or if it's not a single port.

type FuncRunner

type FuncRunner struct {
	// contains filtered or unexported fields
}

func MustNewFuncRunner

func MustNewFuncRunner(registry map[string]FuncCreator) FuncRunner

func (FuncRunner) Run

func (d FuncRunner) Run(funcCalls []FuncCall) (func(ctx context.Context), error)

type IntMsg

type IntMsg struct {
	// contains filtered or unexported fields
}

func NewIntMsg

func NewIntMsg(n int64) IntMsg

func (IntMsg) Bool

func (IntMsg) Bool() bool

func (IntMsg) Float

func (IntMsg) Float() float64

func (IntMsg) Int

func (msg IntMsg) Int() int64

func (IntMsg) List

func (IntMsg) List() []Msg

func (IntMsg) Map

func (IntMsg) Map() map[string]Msg

func (IntMsg) MarshalJSON added in v0.20.0

func (msg IntMsg) MarshalJSON() ([]byte, error)

func (IntMsg) Str

func (IntMsg) Str() string

func (IntMsg) String

func (msg IntMsg) String() string

func (IntMsg) Type

func (msg IntMsg) Type() MsgType

type ListMsg

type ListMsg struct {
	// contains filtered or unexported fields
}

List

func NewListMsg

func NewListMsg(v ...Msg) ListMsg

func (ListMsg) Bool

func (ListMsg) Bool() bool

func (ListMsg) Float

func (ListMsg) Float() float64

func (ListMsg) Int

func (ListMsg) Int() int64

func (ListMsg) List

func (msg ListMsg) List() []Msg

func (ListMsg) Map

func (ListMsg) Map() map[string]Msg

func (ListMsg) MarshalJSON added in v0.20.0

func (msg ListMsg) MarshalJSON() ([]byte, error)

func (ListMsg) Str

func (ListMsg) Str() string

func (ListMsg) String

func (msg ListMsg) String() string

func (ListMsg) Type

func (msg ListMsg) Type() MsgType

type MapMsg

type MapMsg struct {
	// contains filtered or unexported fields
}

Map

func NewMapMsg

func NewMapMsg(m map[string]Msg) MapMsg

func (MapMsg) Bool

func (MapMsg) Bool() bool

func (MapMsg) Float

func (MapMsg) Float() float64

func (MapMsg) Int

func (MapMsg) Int() int64

func (MapMsg) List

func (MapMsg) List() []Msg

func (MapMsg) Map

func (msg MapMsg) Map() map[string]Msg

func (MapMsg) MarshalJSON added in v0.20.0

func (msg MapMsg) MarshalJSON() ([]byte, error)

func (MapMsg) Str

func (MapMsg) Str() string

func (MapMsg) String

func (msg MapMsg) String() string

func (MapMsg) Type

func (msg MapMsg) Type() MsgType

type Msg

type Msg interface {
	fmt.Stringer
	Type() MsgType
	Bool() bool
	Int() int64
	Float() float64
	Str() string
	List() []Msg
	Map() map[string]Msg
}

Msg methods don't return errors because they can be used not only at startup. If runtime functions need to validate message at startup, they must do it by themselves.

type MsgType

type MsgType uint8
const (
	UnknownMsgType MsgType = 0
	BoolMsgType    MsgType = 1
	IntMsgType     MsgType = 2
	FloatMsgType   MsgType = 3
	StrMsgType     MsgType = 4
	ListMsgType    MsgType = 5
	MapMsgType     MsgType = 6
)

type PortAddr

type PortAddr struct {
	Path string // Path is needed to distinguish ports with the same name
	Port string // Separate port field is needed for functions
	Idx  uint8
}

func (PortAddr) String

func (p PortAddr) String() string

type Ports

type Ports map[PortAddr]chan Msg

type Program

type Program struct {
	Ports       Ports
	Connections []Connection
	Funcs       []FuncCall
}

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

func New

func New(connector Connector, funcRunner FuncRunner) Runtime

func (Runtime) Run

func (r Runtime) Run(ctx context.Context, prog Program) error

type StrMsg

type StrMsg struct {
	// contains filtered or unexported fields
}

func NewStrMsg

func NewStrMsg(s string) StrMsg

func (StrMsg) Bool

func (StrMsg) Bool() bool

func (StrMsg) Float

func (StrMsg) Float() float64

func (StrMsg) Int

func (StrMsg) Int() int64

func (StrMsg) List

func (StrMsg) List() []Msg

func (StrMsg) Map

func (StrMsg) Map() map[string]Msg

func (StrMsg) MarshalJSON added in v0.20.0

func (msg StrMsg) MarshalJSON() ([]byte, error)

func (StrMsg) Str

func (msg StrMsg) Str() string

func (StrMsg) String

func (msg StrMsg) String() string

func (StrMsg) Type

func (msg StrMsg) Type() MsgType

Directories

Path Synopsis
Package funcs implements low-level components (runtime functions).
Package funcs implements low-level components (runtime functions).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL