controlapi

package
v0.0.0-...-e660703 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

README

NEX Control API

The NATS execution engine control API is a protocol by which clients can interact with and control NEX node processes. This protocol defines operations for things like ping, info, and running workloads.

Most of the protocol is fairly straightforward, but submitting a workload for execution requires a few pieces of information.

  • jwt - Any workload publisher must sign a set of claims containing the hash field, which asserts the issuer of a file with that specific hash.
  • Encrypted environment - When sending a workload for execution, you'll typically need to set a number of environment variables (e.g. to establish a NATS or DB or HTTP connection). These environment variables contain sensitive information and so are not transmitted in plain text via NATS. They are encrypted with the sender's Xkey, targeting the recipient's Xkey. The recipient is the node to which the workload is being sent, and its public key can be obtained by querying the node's info.
  • Sender public Xkey - the publisher needs to send its own public Xkey along in the request for execution so that the target node can decrypt the environment.

Manually taking these steps, either through the nats CLI or through your own code, can be tedious and error prone, so we recommend using this package for communicating with Nex nodes.

Documentation

Index

Constants

View Source
const (
	AgentStartedEventType    = "agent_started"
	AgentStoppedEventType    = "agent_stopped"
	NodeStartedEventType     = "node_started"
	NodeStoppedEventType     = "node_stopped"
	LameDuckEnteredEventType = "node_entered_lameduck"
	HeartbeatEventType       = "heartbeat"
	WorkloadStartedEventType = "workload_started" // FIXME-- should this be WorkloadDeployed?
	WorkloadStoppedEventType = "workload_stopped" // FIXME-- should this be in addition to WorkloadUndeployed (likely yes, in case of something bad happening...)

)
View Source
const (
	AuctionResponseType  = "io.nats.nex.v1.auction_response"
	InfoResponseType     = "io.nats.nex.v1.info_response"
	PingResponseType     = "io.nats.nex.v1.ping_response"
	RunResponseType      = "io.nats.nex.v1.run_response"
	StopResponseType     = "io.nats.nex.v1.stop_response"
	LameDuckResponseType = "io.nats.nex.v1.lameduck_response"

	TagOS       = "nex.os"
	TagArch     = "nex.arch"
	TagCPUs     = "nex.cpucount"
	TagUnsafe   = "nex.unsafe"
	TagLameDuck = "nex.lameduck"
)
View Source
const (
	APIPrefix = "$NEX"
)

Variables

This section is empty.

Functions

func CreateWorkloadJwt

func CreateWorkloadJwt(hash string, name string, issuer nkeys.KeyPair) (string, error)

func EncryptRequestEnvironment

func EncryptRequestEnvironment(senderXKey nkeys.KeyPair, recipientPublicKey string, env map[string]string) (string, error)

Types

type AgentStartedEvent

type AgentStartedEvent struct {
	AgentVersion string `json:"agent_version"`
}

type AgentStoppedEvent

type AgentStoppedEvent struct {
	Message string `json:"message"`
	Code    int    `json:"code"`
}

type AuctionRequest

type AuctionRequest struct {
	Arch          *string              `json:"arch,omitempty"`
	OS            *string              `json:"os,omitempty"`
	Sandboxed     *bool                `json:"sandboxed,omitempty"`
	Tags          map[string]string    `json:"tags,omitempty"`
	WorkloadTypes []models.NexWorkload `json:"workload_types,omitempty"`
}

type AuctionResponse

type AuctionResponse PingResponse

type Client

type Client struct {
	// contains filtered or unexported fields
}

A control API client communicates with a "Nexus" of nodes by virtue of the $NEX.> subject space. This client should be used to communicate with Nex nodes whenever possible, and its patterns should be copied for clients in other languages. Requests made to the $NEX.> subject space are, when appropriate, secured via signed JWTs

func NewApiClient

func NewApiClient(nc *nats.Conn, timeout time.Duration, log *slog.Logger) *Client

Creates a new client to communicate with a group of NEX nodes, using the namespace of 'default' for applicable requests

func NewApiClientWithNamespace

func NewApiClientWithNamespace(nc *nats.Conn, timeout time.Duration, namespace string, log *slog.Logger) *Client

Creates a new client to communicate with a group of Nex nodes with workloads scoped to the given namespace. Note that this namespace is used for requests where it is mandatory

func (*Client) Auction

func (api *Client) Auction(req *AuctionRequest) ([]AuctionResponse, error)

Attempts to resolve viable candidate nodes where a proposed workload can be deployed

func (*Client) EnterLameDuck

func (api *Client) EnterLameDuck(nodeId string) (*LameDuckResponse, error)

func (*Client) MonitorAllEvents

func (api *Client) MonitorAllEvents() (chan EmittedEvent, error)

A convenience function that monitors all available events without filter, and uses an unbuffered (blocking) channel for the results

func (*Client) MonitorAllLogs

func (api *Client) MonitorAllLogs() (chan EmittedLog, error)

A convenience function that subscribes to all available logs and returns an unbuffered, blocking channel

func (*Client) MonitorEvents

func (api *Client) MonitorEvents(
	namespaceFilter string,
	eventTypeFilter string,
	bufferLength int) (chan EmittedEvent, error)

Creates a NATS subscription to the appropriate event subject. If you don't want to limit the monitor to a specific namespace or event type, then supply '*' for both values, not an empty string. Buffer length is the size of the channel buffer, where 0 is unbuffered (blocking)

func (*Client) MonitorLogs

func (api *Client) MonitorLogs(
	namespaceFilter string,
	nodeFilter string,
	workloadFilter string,
	vmFilter string,
	bufferLength int) (chan EmittedLog, error)

Creates a NATS subscription to the appropriate log subject. If you do not want to limit the monitor by any of the filters, supply a '*', not an empty string. Buffer length refers to the size of the channel buffer, where 0 is unbuffered (aka blocking)

func (*Client) NodeInfo

func (api *Client) NodeInfo(nodeId string) (*InfoResponse, error)

Requests information for a given node within the client's namespace

func (*Client) PingNodes

func (api *Client) PingNodes() ([]PingResponse, error)

Attempts to list all nodes. Note that any node within the Nexus will respond to this ping, regardless of the namespaces of their running workloads

func (*Client) PingWorkloads

func (api *Client) PingWorkloads(workloadID string) ([]WorkloadPingResponse, error)

This is a filtered node ping that returns only matching workloads. A workloadId of "" will not filter by workload, and only filter by the client's namespace. If a workload ID/name is supplied, the filter will be for both namespace and workload. If you don't want these filters then use PingNodes

func (*Client) StartWorkload

func (api *Client) StartWorkload(request *DeployRequest) (*RunResponse, error)

Attempts to start a workload. The workload URI, at the moment, must always point to a NATS object store bucket in the form of `nats://{bucket}/{key}`. Note that JetStream domains can be supplied on the workload request and aren't part of the bucket+key URL.

func (*Client) StopWorkload

func (api *Client) StopWorkload(stopRequest *StopRequest) (*StopResponse, error)

Attempts to stop a running workload. This can fail for a wide variety of reasons, the most common is likely to be security validation that prevents one issuer from submitting a stop request for another issuer's workload

type DeployRequest

type DeployRequest struct {
	Argv         []string           `json:"argv,omitempty"`
	Description  *string            `json:"description,omitempty"`
	WorkloadType models.NexWorkload `json:"type"`
	Location     *url.URL           `json:"location"`
	Essential    *bool              `json:"essential,omitempty"`

	// Contains claims for the workload: name, hash
	WorkloadJwt *string `json:"workload_jwt"`

	// A base64-encoded byte array that contains an encrypted json-serialized map[string]string.
	Environment *string `json:"environment"`

	// If the payload indicates an object store bucket & key, JS domain can be supplied
	JsDomain *string `json:"jsdomain,omitempty"`

	SenderPublicKey *string  `json:"sender_public_key"`
	TargetNode      *string  `json:"target_node"`
	TriggerSubjects []string `json:"trigger_subjects,omitempty"`

	RetryCount *uint      `json:"retry_count,omitempty"`
	RetriedAt  *time.Time `json:"retried_at,omitempty"`

	WorkloadEnvironment map[string]string `json:"-"`
	DecodedClaims       jwt.GenericClaims `json:"-"`
}

func NewDeployRequest

func NewDeployRequest(opts ...RequestOption) (*DeployRequest, error)

Creates a new deploy request based on the supplied options. Note that there is a fluent API function for each available option

func (*DeployRequest) DecryptRequestEnvironment

func (request *DeployRequest) DecryptRequestEnvironment(recipientXKey nkeys.KeyPair) error

func (*DeployRequest) Validate

func (request *DeployRequest) Validate() (*jwt.GenericClaims, error)

This will validate a request's workload JWT. It will not perform a comparison of the hash found in the claims with a recipient's expected hash

type EmittedEvent

type EmittedEvent struct {
	cloudevents.Event
	Namespace string `json:"namespace"`
	EventType string `json:"event_type"`
}

Note this a wrapper to add context to a cloud event

type EmittedLog

type EmittedLog struct {
	Namespace string `json:"namespace"`
	NodeId    string `json:"node_id"`
	Workload  string `json:"workload_id"`
	Timestamp string `json:"timestamp"`
	RawLog
}

Wrapper for what goes across the wire

type Envelope

type Envelope struct {
	PayloadType string      `json:"type"`
	Data        interface{} `json:"data,omitempty"`
	Error       interface{} `json:"error,omitempty"`
}

func NewEnvelope

func NewEnvelope(dataType string, data interface{}, err *string) Envelope

type HeartbeatEvent

type HeartbeatEvent struct {
	Version         string            `json:"version"`
	NodeId          string            `json:"node_id"`
	Nexus           string            `json:"nexus,omitempty"`
	Uptime          string            `json:"uptime"`
	Tags            map[string]string `json:"tags,omitempty"`
	RunningMachines int               `json:"running_machines"`
}

TODO: remove omitempty in next version bump

type InfoResponse

type InfoResponse struct {
	Version                string               `json:"version"`
	Uptime                 string               `json:"uptime"`
	PublicXKey             string               `json:"public_xkey"`
	Tags                   map[string]string    `json:"tags,omitempty"`
	Memory                 *MemoryStat          `json:"memory,omitempty"`
	Machines               []MachineSummary     `json:"machines"`
	SupportedWorkloadTypes []models.NexWorkload `json:"supported_workload_types,omitempty"`
}

type LameDuckEnteredEvent

type LameDuckEnteredEvent struct {
	Version string `json:"version"`
	Id      string `json:"id"`
}

type LameDuckResponse

type LameDuckResponse struct {
	NodeId  string `json:"node_id"`
	Success bool   `json:"success"`
}

type MachineSummary

type MachineSummary struct {
	Id        string          `json:"id"`
	Healthy   bool            `json:"healthy"`
	Uptime    string          `json:"uptime"`
	Namespace string          `json:"namespace,omitempty"`
	Workload  WorkloadSummary `json:"workload,omitempty"`
}

type MemoryStat

type MemoryStat struct {
	MemTotal     int `json:"total"`
	MemFree      int `json:"free"`
	MemAvailable int `json:"available"`
}

type NodeStartedEvent

type NodeStartedEvent struct {
	Version string `json:"version"`
	Id      string `json:"id"`
}

type NodeStoppedEvent

type NodeStoppedEvent struct {
	Id       string `json:"id"`
	Graceful bool   `json:"graceful"`
}

type PingResponse

type PingResponse struct {
	NodeId          string            `json:"node_id"`
	Nexus           string            `json:"nexus,omitempty"`
	Version         string            `json:"version"`
	Uptime          string            `json:"uptime"`
	TargetXkey      string            `json:"target_xkey"`
	Tags            map[string]string `json:"tags,omitempty"`
	RunningMachines int               `json:"running_machines"`
}

TODO: remove omitempty in next version bump

type RawLog

type RawLog struct {
	Text  string     `json:"text"`
	Level slog.Level `json:"level"`
	ID    string     `json:"id"`
}

type RequestOption

type RequestOption func(o requestOptions) requestOptions

func Argv

func Argv(argv []string) RequestOption

Arguments to be passed to the workload, if applicable

func Checksum

func Checksum(hash string) RequestOption

Sets the hash of the workload payload for verification purposes

func Environment

func Environment(env map[string]string) RequestOption

Set the map of environment variables to be used by the workload

func EnvironmentValue

func EnvironmentValue(key string, value string) RequestOption

Sets a single environment value

func Essential

func Essential(essential bool) RequestOption

Set the essential flag to be used by the workload

func Issuer

func Issuer(issuerAccountKey nkeys.KeyPair) RequestOption

An account key used to sign the JWT that accompanies the request and asserts the hash of the file

func JsDomain

func JsDomain(domain string) RequestOption

Optionally set a JetStream domain that will be used to locate an object store when necessary

func Location

func Location(workloadUrl string) RequestOption

Location of the workload. For files in NATS object stores, use nats://BUCKET/key

func SenderXKey

func SenderXKey(xkey nkeys.KeyPair) RequestOption

This is the sender's xkey. The public key will be placed on the request while the private key will be used to encrypt the environment variables

func TargetNode

func TargetNode(publicKey string) RequestOption

Sets the target execution engine node (a public key of type "server") for this request

func TargetPublicXKey

func TargetPublicXKey(key string) RequestOption

Sets the public key of the recipient (A public curve key). This must be set properly or the recipient of the request will be unable to decrypt the environment variables

func TriggerSubjects

func TriggerSubjects(triggerSubjects []string) RequestOption

Sets the trigger subjects to register for this request

func WorkloadDescription

func WorkloadDescription(name string) RequestOption

Description of the workload to run

func WorkloadName

func WorkloadName(name string) RequestOption

Name of the workload. Conforms to the same name rules as the services API

func WorkloadType

func WorkloadType(workloadType models.NexWorkload) RequestOption

Type of the workload, e.g., one of "native", "v8", "oci", "wasm" for this request

type RunResponse

type RunResponse struct {
	Started bool   `json:"started"`
	ID      string `json:"id"`
	Issuer  string `json:"issuer"`
	Name    string `json:"name"`
}

type StopRequest

type StopRequest struct {
	WorkloadId  string `json:"workload_id"`
	WorkloadJwt string `json:"workload_jwt"`
	TargetNode  string `json:"target_node"`
}

func NewStopRequest

func NewStopRequest(workloadId string, name string, targetNode string, issuer nkeys.KeyPair) (*StopRequest, error)

func (*StopRequest) Validate

func (request *StopRequest) Validate(originalClaims *jwt.GenericClaims) error

type StopResponse

type StopResponse struct {
	Stopped bool   `json:"stopped"`
	ID      string `json:"id"`
	Issuer  string `json:"issuer"`
	Name    string `json:"name"`
}

type WorkloadPingMachineSummary

type WorkloadPingMachineSummary struct {
	Id           string             `json:"id"`
	Namespace    string             `json:"namespace"`
	Name         string             `json:"name"`
	WorkloadType models.NexWorkload `json:"type"`
}

type WorkloadPingResponse

type WorkloadPingResponse struct {
	NodeId          string                       `json:"node_id"`
	TargetXkey      string                       `json:"target_xkey"`
	Version         string                       `json:"version"`
	Tags            map[string]string            `json:"tags,omitempty"`
	Uptime          string                       `json:"uptime"`
	RunningMachines []WorkloadPingMachineSummary `json:"running_machines"`
}

type WorkloadStartedEvent

type WorkloadStartedEvent struct {
	Name       string `json:"workload_name"`
	TotalBytes int    `json:"total_bytes"`
}

type WorkloadStoppedEvent

type WorkloadStoppedEvent struct {
	Name    string `json:"workload_name"`
	Code    int    `json:"code"`
	Message string `json:"message"`
}

type WorkloadSummary

type WorkloadSummary struct {
	Name         string             `json:"name"`
	Description  string             `json:"description,omitempty"`
	Runtime      string             `json:"runtime"`
	WorkloadType models.NexWorkload `json:"type"`
	Hash         string             `json:"hash"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL