Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConcurrentExecutor ¶
type ConcurrentExecutor interface { // Run runs the provide ServiceNodeFn concurrently Run() error }
ConcurrentExecutor executes functions on a collection of service nodes concurrently
func NewConcurrentExecutor ¶
func NewConcurrentExecutor( nodes []ServiceNode, concurrency int, timeout time.Duration, fn ServiceNodeFn, ) ConcurrentExecutor
NewConcurrentExecutor returns a new concurrent executor
type Configuration ¶
type Configuration struct { OperationTimeout *time.Duration `yaml:"operationTimeout"` TransferBufferSize *int `yaml:"transferBufferSize"` Retry *xretry.Configuration `yaml:"retry"` Heartbeat *HeartbeatConfiguration `yaml:"heartbeat"` }
Configuration is a YAML wrapper around Options
func (*Configuration) Options ¶
func (c *Configuration) Options(iopts instrument.Options) Options
Options returns `Options` corresponding to the provided struct values
type HeartbeatConfiguration ¶
type HeartbeatConfiguration struct { Enabled *bool `yaml:"enabled"` Timeout *time.Duration `yaml:"timeout"` Interval *time.Duration `yaml:"interval"` CheckInterval *time.Duration `yaml:"checkInterval"` }
HeartbeatConfiguration is a YAML compatible wrapper around HeartbeatOptions
func (*HeartbeatConfiguration) Options ¶
func (h *HeartbeatConfiguration) Options() HeartbeatOptions
Options returns HeartbeatOptions corresponding to the values in the HeartbeatConfiguration
type HeartbeatOptions ¶
type HeartbeatOptions interface { // Validate validates the HeartbeatOptions Validate() error // SetEnabled sets whether the Heartbeating is enabled SetEnabled(bool) HeartbeatOptions // Enabled returns whether the Heartbeating is enabled Enabled() bool // SetNowFn sets the NowFn SetNowFn(xclock.NowFn) HeartbeatOptions // NowFn returns the NowFn NowFn() xclock.NowFn // SetInterval sets the heartbeating interval SetInterval(time.Duration) HeartbeatOptions // Interval returns the heartbeating interval Interval() time.Duration // SetCheckInterval sets the frequency with which heartbeating timeouts // are checked SetCheckInterval(time.Duration) HeartbeatOptions // CheckInterval returns the frequency with which heartbeating timeouts // are checked CheckInterval() time.Duration // SetTimeout sets the heartbeat timeout duration, i.e. the window of // time after which missing heartbeats are considered errorneous SetTimeout(time.Duration) HeartbeatOptions // Timeout returns the heartbeat timeout duration, i.e. the window of // time after which missing heartbeats are considered errorneous Timeout() time.Duration // SetHeartbeatRouter sets the heartbeat router to be used SetHeartbeatRouter(HeartbeatRouter) HeartbeatOptions // HeartbeatRouter returns the heartbeat router in use HeartbeatRouter() HeartbeatRouter }
HeartbeatOptions are the knobs to control heartbeating behavior
func NewHeartbeatOptions ¶
func NewHeartbeatOptions() HeartbeatOptions
NewHeartbeatOptions returns the default HeartbeatOptions
type HeartbeatRouter ¶
type HeartbeatRouter interface { hb.HeartbeaterServer // Endpoint returns the router endpoint Endpoint() string // Register registers the specified server under the given id Register(string, hb.HeartbeaterServer) error // Deregister un-registers any server registered under the given id Deregister(string) error }
HeartbeatRouter routes heartbeats based on registered servers
func NewHeartbeatRouter ¶
func NewHeartbeatRouter(endpoint string) HeartbeatRouter
NewHeartbeatRouter returns a new heartbeat router
type Listener ¶
type Listener interface { // OnProcessTerminate is invoked when the remote process being run terminates OnProcessTerminate(node ServiceNode, desc string) // OnHeartbeatTimeout is invoked upon remote heartbeats having timed-out OnHeartbeatTimeout(node ServiceNode, lastHeartbeatTs time.Time) // OnOverwrite is invoked if remote agent control is overwritten by another // coordinator OnOverwrite(node ServiceNode, desc string) }
Listener provides callbacks invoked upon remote process state transitions
func NewListener ¶
func NewListener( onProcessTerminate func(ServiceNode, string), onHeartbeatTimeout func(ServiceNode, time.Time), onOverwrite func(ServiceNode, string), ) Listener
NewListener creates a new listener
type OperatorClientFn ¶
type OperatorClientFn func() (*grpc.ClientConn, m3em.OperatorClient, error)
OperatorClientFn returns a function able to construct connections to remote Operators
type Options ¶
type Options interface { // Validate validates the NodeOptions Validate() error // SetInstrumentOptions sets the instrumentation options SetInstrumentOptions(instrument.Options) Options // InstrumentOptions returns the instrumentation options InstrumentOptions() instrument.Options // SetOperationTimeout returns the timeout for node operations SetOperationTimeout(time.Duration) Options // OperationTimeout returns the timeout for node operations OperationTimeout() time.Duration // SetRetrier sets the retrier for node operations SetRetrier(xretry.Retrier) Options // OperationRetrier returns the retrier for node operations Retrier() xretry.Retrier // SetTransferBufferSize sets the bytes buffer size used during file transfer SetTransferBufferSize(int) Options // TransferBufferSize returns the bytes buffer size used during file transfer TransferBufferSize() int // SetMaxPullSize sets the max bytes retrieved from remote agents when // fetching output files SetMaxPullSize(int64) Options // MaxPullSize returns the max bytes retrieved from remote agents when // fetching output files MaxPullSize() int64 // SetHeartbeatOptions sets the HeartbeatOptions SetHeartbeatOptions(HeartbeatOptions) Options // HeartbeatOptions returns the HeartbeatOptions HeartbeatOptions() HeartbeatOptions // SetOperatorClientFn sets the OperatorClientFn SetOperatorClientFn(OperatorClientFn) Options // OperatorClientFn returns the OperatorClientFn OperatorClientFn() OperatorClientFn }
Options are the various knobs to control Node behavior
func NewOptions ¶
func NewOptions( opts instrument.Options, ) Options
NewOptions returns a new Options construct.
type RemoteOutputType ¶
type RemoteOutputType int
RemoteOutputType describes the various outputs available on the remote agents
const ( // RemoteProcessStdout refers to the remote process stdout RemoteProcessStdout RemoteOutputType = iota // RemoteProcessStderr refers to the remote process stderr RemoteProcessStderr )
type ServiceNode ¶
type ServiceNode interface { placement.Instance // Setup initializes the directories, config file, and binary for the process being tested. // It does not Start the process on the ServiceNode. Setup( build build.ServiceBuild, config build.ServiceConfiguration, token string, force bool, ) error // Start starts the service process for this ServiceNode. Start() error // Stop stops the service process for this ServiceNode. Stop() error // Status returns the ServiceNode status. Status() Status // Teardown releases any remote resources used for testing. Teardown() error // Close releases any locally held resources Close() error // RegisterListener registers an event listener RegisterListener(Listener) ListenerID // DeregisterListener un-registers an event listener DeregisterListener(ListenerID) // TransferLocalFile transfers a local file to the specified destination paths // NB: destPaths are not allowed to use relative path specifiers, i.e. '..' is illegal; // the eventual destination path on remote hosts is relative to the working directory // of the remote agent. // e.g. if the remote agent has working directory /var/m3em-agent, and we make the call: // svcNode.TransferLocalFile("some/local/file/path/id", []string{"path/id", "another/path/id"}) // // upon success, there will be two new files under the remote agent working directory: // /var/m3em-agent/ // /var/m3em-agent/path/id <-- same contents as "some/local/file/path/id" // /var/m3em-agent/another/path/id <-- same contents as "some/local/file/path/id" TransferLocalFile(localSrc string, destPaths []string, overwrite bool) error // GetRemoteOutput transfers the specified remote file to the specified path GetRemoteOutput(t RemoteOutputType, localDest string) (truncated bool, err error) }
ServiceNode represents an executable service node. This object controls both the service and resources on the host running the service (e.g. fs, processes, etc.)
type ServiceNodeFn ¶
type ServiceNodeFn func(ServiceNode) error
ServiceNodeFn performs an operation on a given ServiceNode
type Status ¶
type Status int
Status indicates the different states a ServiceNode can be in. The state diagram below describes the transitions between the various states:
┌──────────────────┐ │ │ ┌Teardown()─────│ Error │ │ │ │ │ └──────────────────┘ │ ▼
┌──────────────────┐ ┌───────────────-──┐ │ │ Setup() │ │ │ Uninitialized ├────────────────────────▶│ Setup │◀─┐ │ │◀───────────┐ │ │ │ └──────────────────┘ Teardown()└────────────└──────────────────┘ │
▲ │ │ │ │ │ │ │ │ │ Start() │ │ │ ┌─────────────┘ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │Teardown() │ │ | └────────────────────│ Running │────────────Stop() │ │ └──────────────────┘
const ( // StatusUninitialized refers to the state of an un-initialized node. StatusUninitialized Status = iota // StatusSetup is the state of a node which has been Setup() StatusSetup // StatusRunning is the state of a node which has been Start()-ed StatusRunning // StatusError is the state of a node which is in an Error state StatusError )