chaperone

package module
v0.0.0-...-03fae82 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2024 License: BSD-3-Clause Imports: 8 Imported by: 1

README

chaperone

Documentation

Index

Constants

View Source
const DefaultErrorLevel = ErrorLevelInfo

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Debug bool
}

type Edge

type Edge struct {
	// contains filtered or unexported fields
}

func (*Edge) Close

func (e *Edge) Close()

func (*Edge) GetChannel

func (e *Edge) GetChannel() chan Message

func (*Edge) Name

func (e *Edge) Name() string

func (*Edge) Send

func (e *Edge) Send(env Message) error

func (*Edge) SetChannel

func (e *Edge) SetChannel(c chan Message)

type EnvHandler

type EnvHandler interface {
	Start(ctx context.Context) error
	Handle(ctx context.Context, env Message) (Message, error)
	Stop()
}

type Envelope

type Envelope[T Message] struct {
	Message    T
	NumRetries int
	Metadata   map[string]interface{}
}

func NewEnvelope

func NewEnvelope[T Message](message T, numRetries int) *Envelope[T]

func (*Envelope[T]) String

func (e *Envelope[T]) String() string

type EnvelopeWorker

type EnvelopeWorker interface {
	Name() string
	AddOutput(MessageCarrier)
	AddInput(MessageCarrier)
	AddWorkers(MessageCarrier, int, string, EnvHandler)
	GetHandler() EnvHandler
	SetEvents(MessageCarrier)
	Start(context.Context)
	RestartWorkers(context.Context)
	Stop(*Event)
	GetMetrics() *Metrics
}

type ErrorLevel

type ErrorLevel int

ErrorLevel represents the severity of an error.

const (
	// ErrorLevelDefault selects the default error level.
	ErrorLevelDefault ErrorLevel = iota
	// ErrorLevelDebug represents a very granular message.
	ErrorLevelDebug
	// ErrorLevelInfo represents an informational message.
	ErrorLevelInfo
	// ErrorLevelWarning represents a warning message.
	ErrorLevelWarning
	// ErrorLevelError represents an error message.
	ErrorLevelError
	// ErrorLevelCritical represents a critical error message.
	ErrorLevelCritical
	// Done is a special error level that indicates the end of a process.
	Done
)

func (ErrorLevel) Level

func (level ErrorLevel) Level() string

String returns the string representation of the error level.

func (ErrorLevel) String

func (e ErrorLevel) String() string

type Event

type Event struct {
	Worker string
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent(level ErrorLevel, err error, env Message) *Event

func (Event) Error

func (e Event) Error() string

Error implements the error interface.

func (*Event) Event

func (e *Event) Event() string

func (*Event) Level

func (e *Event) Level() ErrorLevel

func (*Event) Message

func (e *Event) Message() Message

func (*Event) String

func (e *Event) String() string

func (*Event) Unwrap

func (e *Event) Unwrap() error

Unwrap returns the underlying error.

func (*Event) Wrap

func (e *Event) Wrap(err error) *Event

type EventWorker

type EventWorker interface {
	Name() string
	AddChildSupervisor(EventWorker)
	SetEvents(MessageCarrier)
	SetCancel(context.CancelFunc)
	Start(ctx context.Context)
	Stop()
}

type EvtHandler

type EvtHandler interface {
	Start(ctx context.Context) error
	Handle(ctx context.Context, evt Message) error
	Stop()
}

type Graph

type Graph struct {
	Name        string
	Supervisors map[string]EventWorker
	Nodes       map[string]EnvelopeWorker
	Edges       []MessageCarrier
	// contains filtered or unexported fields
}

func NewGraph

func NewGraph(name string, config *Config) *Graph

func (*Graph) AddEdge

func (g *Graph) AddEdge(edge MessageCarrier) *Graph

func (*Graph) AddNode

func (g *Graph) AddNode(supervisor EventWorker, node EnvelopeWorker) *Graph

func (*Graph) AddSupervisor

func (g *Graph) AddSupervisor(parent EventWorker, supervisor EventWorker) *Graph

func (*Graph) Metrics

func (g *Graph) Metrics(nodes []string) []*Metrics

Metrics returns the metrics for the given nodes If nodes is nil, return metrics for all nodes Case sensitive naming

func (*Graph) Start

func (g *Graph) Start(ctx context.Context) *Graph

func (*Graph) Stop

func (g *Graph) Stop() *Graph

type Message

type Message interface {
	String() string
}

type MessageCarrier

type MessageCarrier interface {
	Name() string
	Send(env Message) error
	GetChannel() chan Message
	SetChannel(chan Message)
	Close()
}

func NewEdge

func NewEdge(name string, fn EnvelopeWorker, tn EnvelopeWorker, bufferSize int, numWorkers int) MessageCarrier

type Metrics

type Metrics struct {
	NodeName   string
	BitRate    uint64 // bits per second
	PacketRate uint64 // packets per second
	ErrorRate  uint64 // errors per second
	AvgDepth   uint64 // packets waiting in queue
}

type Node

type Node[In, Out Message] struct {
	Handler         EnvHandler
	LoopbackHandler EnvHandler
	WorkerPool      map[string][]*Worker
	WorkerCounter   uint64
	RunningWorkers  int64

	In     map[string]MessageCarrier
	Out    OutMux
	Events MessageCarrier
	// contains filtered or unexported fields
}

func NewNode

func NewNode[In, Out Message](name string, handler, lbHandler EnvHandler) *Node[In, Out]

func (*Node[In, Out]) AddInput

func (n *Node[In, Out]) AddInput(edge MessageCarrier)

func (*Node[In, Out]) AddOutput

func (n *Node[In, Out]) AddOutput(edge MessageCarrier)

func (*Node[In, Out]) AddWorkers

func (n *Node[In, Out]) AddWorkers(edge MessageCarrier, num int, name string, handler EnvHandler)

func (*Node[In, Out]) GetHandler

func (n *Node[In, Out]) GetHandler() EnvHandler

func (*Node[In, Out]) GetMetrics

func (n *Node[In, Out]) GetMetrics() *Metrics

func (*Node[In, Out]) Name

func (n *Node[In, Out]) Name() string

func (*Node[In, Out]) Restart

func (n *Node[In, Out]) Restart(evt *Event)

func (*Node[In, Out]) RestartWorkers

func (n *Node[In, Out]) RestartWorkers(ctx context.Context)

func (*Node[In, Out]) RunningWorkerCount

func (n *Node[In, Out]) RunningWorkerCount() int

func (*Node[In, Out]) SendEvent

func (n *Node[In, Out]) SendEvent(evt *Event)

func (*Node[In, Out]) SetEvents

func (n *Node[In, Out]) SetEvents(edge MessageCarrier)

func (*Node[In, Out]) Start

func (n *Node[In, Out]) Start(ctx context.Context)

func (*Node[In, Out]) Stop

func (n *Node[In, Out]) Stop(evt *Event)

func (*Node[In, Out]) StopWorkers

func (n *Node[In, Out]) StopWorkers()

type OutMux

type OutMux struct {
	Name     string
	OutChans map[string]MessageCarrier
	GoChans  map[string]MessageCarrier
}

func NewOutMux

func NewOutMux(name string) OutMux

func (*OutMux) AddChannel

func (o *OutMux) AddChannel(edge MessageCarrier)

func (*OutMux) Send

func (o *OutMux) Send(env Message)

type Supervisor

type Supervisor struct {
	Events       MessageCarrier
	ParentEvents MessageCarrier
	Supervisors  map[string]EventWorker
	Nodes        map[string]EnvelopeWorker
	Handler      EvtHandler
	// contains filtered or unexported fields
}

func NewSupervisor

func NewSupervisor(name string, handler EvtHandler) *Supervisor

func (*Supervisor) AddChildSupervisor

func (s *Supervisor) AddChildSupervisor(supervisor EventWorker)

func (*Supervisor) Name

func (s *Supervisor) Name() string

func (*Supervisor) SetCancel

func (s *Supervisor) SetCancel(cancel context.CancelFunc)

func (*Supervisor) SetEvents

func (s *Supervisor) SetEvents(edge MessageCarrier)

func (*Supervisor) Start

func (s *Supervisor) Start(ctx context.Context)

func (*Supervisor) Stop

func (s *Supervisor) Stop()

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL