ctl

package
v0.0.0-...-bfb58f9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2021 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxNodeStatusQueueSize = 50
	MaxNodeResultQueueSize = 50
	StreamConnectTimeout   = time.Second
)

Variables

View Source
var (
	ErrNotAvailableNode = errors.New("ctl: cannot find suitable node")
)

Functions

func NodeUtilizationIndex

func NodeUtilizationIndex(s *Status) float64

NodeUtilizationIndex calculates a utilization index based on the internal consumption of system resources in a node. The higher the value the more busy will be the node, so less eligible. This function will return an infinite positive value in case of any resource exceeds the limit threshold, indicating that the node should not be chosen. We cannot assume that this function will always return a percentage. Its just an index, the lower, the best.

The current implementation is a simplistic one (see comments) and works on a best effort.

func ScoreNode

func ScoreNode(d *Node)

ScoreNode calculates a general score for the Node passed and sets the value in the Node struct. This function tends to include all the available calculations in order to nurture a priority queue. The more the score the more eligible will be the node. Negative scoring is possible.

Types

type AddCapturerRequest

type AddCapturerRequest struct {
	UUID string
	URL  string
}

type Capturer

type Capturer struct {
	UUID   string
	URL    string
	Status string
}

type ClientGenerator

type ClientGenerator func(addr string, l *logrus.Logger, service *metrics.Service) NodeClient

type Ctl

type Ctl struct {
	// contains filtered or unexported fields
}

func NewCtl

func NewCtl(logger *logrus.Logger, metricsService *metrics.Service, clientGen ClientGenerator) *Ctl

func (*Ctl) AddCapturer

func (c *Ctl) AddCapturer(ctx context.Context, uid, url string) error

func (*Ctl) AddNode

func (c *Ctl) AddNode(addr string) (string, error)

func (*Ctl) LoadCategories

func (c *Ctl) LoadCategories(ctx context.Context, categories []string, image []byte) error

func (*Ctl) Shutdown

func (c *Ctl) Shutdown(ctx context.Context) error

type GRPCNodeClient

type GRPCNodeClient struct {
	// contains filtered or unexported fields
}

func NewGRPCNodeClient

func NewGRPCNodeClient(addr string, logger *logrus.Logger, metricsRegistry *metrics.Service) *GRPCNodeClient

func (*GRPCNodeClient) AddCapturer

func (c *GRPCNodeClient) AddCapturer(ctx context.Context, request *AddCapturerRequest) error

func (*GRPCNodeClient) Connect

func (c *GRPCNodeClient) Connect() error

func (*GRPCNodeClient) LoadCategories

func (c *GRPCNodeClient) LoadCategories(ctx context.Context, request *LoadCategoriesRequest) error

func (*GRPCNodeClient) NextResult

func (c *GRPCNodeClient) NextResult() (*Result, error)

func (*GRPCNodeClient) NextStatus

func (c *GRPCNodeClient) NextStatus() (*Status, error)

func (*GRPCNodeClient) RemoveCapturer

func (c *GRPCNodeClient) RemoveCapturer(ctx context.Context, request *RemoveCapturerRequest) error

func (*GRPCNodeClient) Shutdown

func (c *GRPCNodeClient) Shutdown() error

type HeapNodePriorityQueue

type HeapNodePriorityQueue struct {
	// contains filtered or unexported fields
}

HeapNodePriorityQueue is an implementation of NodePriorityQueue based on a heap. This necessary implements heap.Interface, as we are using the out of the box heap of the std lib. Such methods should be used only internally by the std lib.

func NewHeapNodePriorityQueue

func NewHeapNodePriorityQueue() *HeapNodePriorityQueue

func (*HeapNodePriorityQueue) Len

func (h *HeapNodePriorityQueue) Len() int

func (*HeapNodePriorityQueue) Less

func (h *HeapNodePriorityQueue) Less(i, j int) bool

func (*HeapNodePriorityQueue) Next

func (h *HeapNodePriorityQueue) Next() *Node

func (*HeapNodePriorityQueue) Pop

func (h *HeapNodePriorityQueue) Pop() interface{}

func (*HeapNodePriorityQueue) Push

func (h *HeapNodePriorityQueue) Push(x interface{})

func (*HeapNodePriorityQueue) Remove

func (h *HeapNodePriorityQueue) Remove(uuid string) error

func (*HeapNodePriorityQueue) Swap

func (h *HeapNodePriorityQueue) Swap(i, j int)

func (*HeapNodePriorityQueue) Upsert

func (h *HeapNodePriorityQueue) Upsert(node *Node)

type LoadAverage

type LoadAverage struct {
	Avg1  float64
	Avg5  float64
	Avg15 float64
}

type LoadCategoriesRequest

type LoadCategoriesRequest struct {
	Categories []string
	Image      []byte
}

type Memory

type Memory struct {
	UsedMemoryBytes  uint64
	TotalMemoryBytes uint64
}

type Network

type Network struct {
	RxBytesSec uint64
	TxBytesSec uint64
}

type Node

type Node struct {
	UUID   string
	Addr   string
	Index  int
	Score  float64
	Status *Status
}

type NodeClient

type NodeClient interface {
	Connect() error
	LoadCategories(ctx context.Context, r *LoadCategoriesRequest) error
	AddCapturer(ctx context.Context, r *AddCapturerRequest) error
	RemoveCapturer(ctx context.Context, r *RemoveCapturerRequest) error
	NextStatus() (*Status, error)
	NextResult() (*Result, error)
	Shutdown() error
}

type NodeHandler

type NodeHandler struct {
	// contains filtered or unexported fields
}

func NewNodeHandler

func NewNodeHandler(node *Node, client NodeClient, logger *logrus.Logger) *NodeHandler

func (*NodeHandler) LoadCategories

func (ds *NodeHandler) LoadCategories(ctx context.Context, categories []string, image []byte) error

func (*NodeHandler) Shutdown

func (ds *NodeHandler) Shutdown() error

func (*NodeHandler) Start

func (ds *NodeHandler) Start() error

type NodePriorityQueue

type NodePriorityQueue interface {
	// Upsert will add the *Node passed as argument to the queue.
	// If the element already exists it will replace it.
	Upsert(*Node)
	Len() int
	Remove(string) error
	// Next must return the next most suitable *Node for doing some task.
	// When the queue is empty, nil should be returned.
	Next() *Node
}

NodePriorityQueue defines the interfaces needed for interacting with the scheduler. Multiple implementations based on different criteria are expected.

type NodeRegistry

type NodeRegistry struct {
	// contains filtered or unexported fields
}

func NewNodeRegistry

func NewNodeRegistry(nodeQueue NodePriorityQueue, logger *logrus.Logger) *NodeRegistry

func (*NodeRegistry) Add

func (r *NodeRegistry) Add(s *NodeHandler)

func (*NodeRegistry) Find

func (r *NodeRegistry) Find(_ string) (NodeClient, error)

func (*NodeRegistry) LoadCategories

func (r *NodeRegistry) LoadCategories(ctx context.Context, categories []string, image []byte) error

func (*NodeRegistry) ShutdownAll

func (r *NodeRegistry) ShutdownAll(ctx context.Context) error

type RemoveCapturerRequest

type RemoveCapturerRequest struct {
	UUID string
}

type Result

type Result struct {
	CapturerUUID  string
	Recognized    []string
	TotalEntities int32
	RecognizedAt  time.Time
	CapturedAt    time.Time
}

type Server

type Server struct {
	Ctl *Ctl

	L *sync.Mutex
	// contains filtered or unexported fields
}

func New

func New(opts ...config.Option) (*Server, error)

func NewFromEnv

func NewFromEnv() (*Server, error)

func NewWith

func NewWith(metricsService *metrics.Service, logger *logrus.Logger, opts ...config.Option) *Server

func (*Server) Shutdown

func (s *Server) Shutdown()

func (*Server) Start

func (s *Server) Start() error

type Status

type Status struct {
	Description string
	Capturers   []*Capturer
	System      System
}

type System

type System struct {
	CPUCount    int
	Network     Network
	LoadAverage LoadAverage
	Memory      Memory
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL