node

package
v0.0.0-...-c39c2fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2018 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConcurrentExecutor

type ConcurrentExecutor interface {
	// Run runs the provide ServiceNodeFn concurrently
	Run() error
}

ConcurrentExecutor executes functions on a collection of service nodes concurrently

func NewConcurrentExecutor

func NewConcurrentExecutor(
	nodes []ServiceNode,
	concurrency int,
	timeout time.Duration,
	fn ServiceNodeFn,
) ConcurrentExecutor

NewConcurrentExecutor returns a new concurrent executor

type Configuration

type Configuration struct {
	OperationTimeout   *time.Duration          `yaml:"operationTimeout"`
	TransferBufferSize *int                    `yaml:"transferBufferSize"`
	Retry              *xretry.Configuration   `yaml:"retry"`
	Heartbeat          *HeartbeatConfiguration `yaml:"heartbeat"`
}

Configuration is a YAML wrapper around Options

func (*Configuration) Options

func (c *Configuration) Options(iopts instrument.Options) Options

Options returns `Options` corresponding to the provided struct values

type HeartbeatConfiguration

type HeartbeatConfiguration struct {
	Enabled       *bool          `yaml:"enabled"`
	Timeout       *time.Duration `yaml:"timeout"`
	Interval      *time.Duration `yaml:"interval"`
	CheckInterval *time.Duration `yaml:"checkInterval"`
}

HeartbeatConfiguration is a YAML compatible wrapper around HeartbeatOptions

func (*HeartbeatConfiguration) Options

Options returns HeartbeatOptions corresponding to the values in the HeartbeatConfiguration

type HeartbeatOptions

type HeartbeatOptions interface {
	// Validate validates the HeartbeatOptions
	Validate() error

	// SetEnabled sets whether the Heartbeating is enabled
	SetEnabled(bool) HeartbeatOptions

	// Enabled returns whether the Heartbeating is enabled
	Enabled() bool

	// SetNowFn sets the NowFn
	SetNowFn(xclock.NowFn) HeartbeatOptions

	// NowFn returns the NowFn
	NowFn() xclock.NowFn

	// SetInterval sets the heartbeating interval
	SetInterval(time.Duration) HeartbeatOptions

	// Interval returns the heartbeating interval
	Interval() time.Duration

	// SetCheckInterval sets the frequency with which heartbeating timeouts
	// are checked
	SetCheckInterval(time.Duration) HeartbeatOptions

	// CheckInterval returns the frequency with which heartbeating timeouts
	// are checked
	CheckInterval() time.Duration

	// SetTimeout sets the heartbeat timeout duration, i.e. the window of
	// time after which missing heartbeats are considered errorneous
	SetTimeout(time.Duration) HeartbeatOptions

	// Timeout returns the heartbeat timeout duration, i.e. the window of
	// time after which missing heartbeats are considered errorneous
	Timeout() time.Duration

	// SetHeartbeatRouter sets the heartbeat router to be used
	SetHeartbeatRouter(HeartbeatRouter) HeartbeatOptions

	// HeartbeatRouter returns the heartbeat router in use
	HeartbeatRouter() HeartbeatRouter
}

HeartbeatOptions are the knobs to control heartbeating behavior

func NewHeartbeatOptions

func NewHeartbeatOptions() HeartbeatOptions

NewHeartbeatOptions returns the default HeartbeatOptions

type HeartbeatRouter

type HeartbeatRouter interface {
	hb.HeartbeaterServer

	// Endpoint returns the router endpoint
	Endpoint() string

	// Register registers the specified server under the given id
	Register(string, hb.HeartbeaterServer) error

	// Deregister un-registers any server registered under the given id
	Deregister(string) error
}

HeartbeatRouter routes heartbeats based on registered servers

func NewHeartbeatRouter

func NewHeartbeatRouter(endpoint string) HeartbeatRouter

NewHeartbeatRouter returns a new heartbeat router

type Listener

type Listener interface {
	// OnProcessTerminate is invoked when the remote process being run terminates
	OnProcessTerminate(node ServiceNode, desc string)

	// OnHeartbeatTimeout is invoked upon remote heartbeats having timed-out
	OnHeartbeatTimeout(node ServiceNode, lastHeartbeatTs time.Time)

	// OnOverwrite is invoked if remote agent control is overwritten by another
	// coordinator
	OnOverwrite(node ServiceNode, desc string)
}

Listener provides callbacks invoked upon remote process state transitions

func NewListener

func NewListener(
	onProcessTerminate func(ServiceNode, string),
	onHeartbeatTimeout func(ServiceNode, time.Time),
	onOverwrite func(ServiceNode, string),
) Listener

NewListener creates a new listener

type ListenerID

type ListenerID int

ListenerID is a unique identifier for a registered listener

type OperatorClientFn

type OperatorClientFn func() (*grpc.ClientConn, m3em.OperatorClient, error)

OperatorClientFn returns a function able to construct connections to remote Operators

type Options

type Options interface {
	// Validate validates the NodeOptions
	Validate() error

	// SetInstrumentOptions sets the instrumentation options
	SetInstrumentOptions(instrument.Options) Options

	// InstrumentOptions returns the instrumentation options
	InstrumentOptions() instrument.Options

	// SetOperationTimeout returns the timeout for node operations
	SetOperationTimeout(time.Duration) Options

	// OperationTimeout returns the timeout for node operations
	OperationTimeout() time.Duration

	// SetRetrier sets the retrier for node operations
	SetRetrier(xretry.Retrier) Options

	// OperationRetrier returns the retrier for node operations
	Retrier() xretry.Retrier

	// SetTransferBufferSize sets the bytes buffer size used during file transfer
	SetTransferBufferSize(int) Options

	// TransferBufferSize returns the bytes buffer size used during file transfer
	TransferBufferSize() int

	// SetMaxPullSize sets the max bytes retrieved from remote agents when
	// fetching output files
	SetMaxPullSize(int64) Options

	// MaxPullSize returns the max bytes retrieved from remote agents when
	// fetching output files
	MaxPullSize() int64

	// SetHeartbeatOptions sets the HeartbeatOptions
	SetHeartbeatOptions(HeartbeatOptions) Options

	// HeartbeatOptions returns the HeartbeatOptions
	HeartbeatOptions() HeartbeatOptions

	// SetOperatorClientFn sets the OperatorClientFn
	SetOperatorClientFn(OperatorClientFn) Options

	// OperatorClientFn returns the OperatorClientFn
	OperatorClientFn() OperatorClientFn
}

Options are the various knobs to control Node behavior

func NewOptions

func NewOptions(
	opts instrument.Options,
) Options

NewOptions returns a new Options construct.

type RemoteOutputType

type RemoteOutputType int

RemoteOutputType describes the various outputs available on the remote agents

const (
	// RemoteProcessStdout refers to the remote process stdout
	RemoteProcessStdout RemoteOutputType = iota

	// RemoteProcessStderr refers to the remote process stderr
	RemoteProcessStderr
)

type ServiceNode

type ServiceNode interface {
	placement.Instance

	// Setup initializes the directories, config file, and binary for the process being tested.
	// It does not Start the process on the ServiceNode.
	Setup(
		build build.ServiceBuild,
		config build.ServiceConfiguration,
		token string,
		force bool,
	) error

	// Start starts the service process for this ServiceNode.
	Start() error

	// Stop stops the service process for this ServiceNode.
	Stop() error

	// Status returns the ServiceNode status.
	Status() Status

	// Teardown releases any remote resources used for testing.
	Teardown() error

	// Close releases any locally held resources
	Close() error

	// RegisterListener registers an event listener
	RegisterListener(Listener) ListenerID

	// DeregisterListener un-registers an event listener
	DeregisterListener(ListenerID)

	// TransferLocalFile transfers a local file to the specified destination paths
	// NB: destPaths are not allowed to use relative path specifiers, i.e. '..' is illegal;
	// the eventual destination path on remote hosts is relative to the working directory
	// of the remote agent.
	// e.g. if the remote agent has working directory /var/m3em-agent, and we make the call:
	// svcNode.TransferLocalFile("some/local/file/path/id", []string{"path/id", "another/path/id"})
	//
	// upon success, there will be two new files under the remote agent working directory:
	// /var/m3em-agent/
	// /var/m3em-agent/path/id          <-- same contents as "some/local/file/path/id"
	// /var/m3em-agent/another/path/id  <-- same contents as "some/local/file/path/id"
	TransferLocalFile(localSrc string, destPaths []string, overwrite bool) error

	// GetRemoteOutput transfers the specified remote file to the specified path
	GetRemoteOutput(t RemoteOutputType, localDest string) (truncated bool, err error)
}

ServiceNode represents an executable service node. This object controls both the service and resources on the host running the service (e.g. fs, processes, etc.)

func New

func New(
	node placement.Instance,
	opts Options,
) (ServiceNode, error)

New returns a new ServiceNode.

type ServiceNodeFn

type ServiceNodeFn func(ServiceNode) error

ServiceNodeFn performs an operation on a given ServiceNode

type ServiceNodes

type ServiceNodes []ServiceNode

ServiceNodes is a collection of ServiceNode(s)

type Status

type Status int

Status indicates the different states a ServiceNode can be in. The state diagram below describes the transitions between the various states:

                ┌──────────────────┐
                │                  │
┌Teardown()─────│      Error       │
│               │                  │
│               └──────────────────┘
│
▼

┌──────────────────┐ ┌───────────────-──┐ │ │ Setup() │ │ │ Uninitialized ├────────────────────────▶│ Setup │◀─┐ │ │◀───────────┐ │ │ │ └──────────────────┘ Teardown()└────────────└──────────────────┘ │

▲                                            │           │
│                                            │           │
│                                            │           │
│                                  Start()   │           │
│                              ┌─────────────┘           │
│                              │                         │
│                              │                         │
│                              │                         │
│                              ▼                         │
│                    ┌──────────────────┐                │
│Teardown()          │                  │                |
└────────────────────│     Running      │────────────Stop()
                     │                  │
                     └──────────────────┘
const (
	// StatusUninitialized refers to the state of an un-initialized node.
	StatusUninitialized Status = iota

	// StatusSetup is the state of a node which has been Setup()
	StatusSetup

	// StatusRunning is the state of a node which has been Start()-ed
	StatusRunning

	// StatusError is the state of a node which is in an Error state
	StatusError
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL