Documentation ¶
Index ¶
- type Executor
- func (e *Executor) Execs(ctx context.Context) ([]reflow.Exec, error)
- func (e *Executor) Get(ctx context.Context, id digest.Digest) (reflow.Exec, error)
- func (e *Executor) Kill(ctx context.Context) error
- func (e *Executor) Load(ctx context.Context, repo *url.URL, fs reflow.Fileset) (reflow.Fileset, error)
- func (e *Executor) Put(ctx context.Context, id digest.Digest, cfg reflow.ExecConfig) (reflow.Exec, error)
- func (e *Executor) Remove(ctx context.Context, id digest.Digest) error
- func (e *Executor) Repository() reflow.Repository
- func (e *Executor) Resources() reflow.Resources
- func (e *Executor) SetResources(r reflow.Resources)
- func (e *Executor) Start() error
- func (e *Executor) URI() string
- func (e *Executor) Unload(ctx context.Context, fs reflow.Fileset) error
- func (e *Executor) VerifyIntegrity(ctx context.Context, fs reflow.Fileset) error
- type Manifest
- type OomDetector
- type Pool
- func (p *Pool) Alloc(ctx context.Context, id string) (pool.Alloc, error)
- func (p *Pool) Kill(a pool.Alloc) error
- func (p *Pool) MaintainTaskDBRow(ctx context.Context)
- func (p *Pool) Name() string
- func (p *Pool) New(ctx context.Context, id string, meta pool.AllocMeta, keepalive time.Duration, ...) (pool.Alloc, error)
- func (p *Pool) Resources() reflow.Resources
- func (p *Pool) Start(expectedUsableMemBytes int64) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Executor ¶
type Executor struct { // RunID of the run - <username>@grailbio.com/<hash> RunID string // ID is the ID of the executor. It is the URI of the executor and also // the prefix used in any Docker containers whose exec's are // children of this executor. ID string // Prefix is the filesystem prefix used to access paths on disk. This is // defined so that the executor can run inside of a Docker container // (which has the host's filesystem exported at this prefix). Prefix string // Dir is the root directory of this executor. All of its state is contained // within it. Dir string // Client is the Docker client used by this executor. Client *docker.Client // Authenticator is used to pull images that are stored on Amazon's ECR // service. Authenticator ecrauth.Interface // AWSCreds is an AWS credentials provider, used for "$aws" passthroughs. AWSCreds *credentials.Credentials // Log is this executor's logger where operational status is printed. Log *log.Logger // FileRepository is the (file-based) object repository used by this // Executor. It may be provided by the user, or else it is set to a // default implementation when (*Executor).Start is called. FileRepository *filerepo.Repository // HardMemLimit restricts an exec's memory limit to the exec's resource requirements HardMemLimit bool Blob blob.Mux // NodeOomDetector is an oom detector based node metrics NodeOomDetector OomDetector // IntegrityErrSignal is a channel for signaling an integrity issue with // the EC2 instance's EBS volume(s). The signal is sent by this Executor // if a file fails integrity verification in Load or VerifyIntegrity. IntegrityErrSignal chan struct{} // SaveLogsToRepo determines whether or not exec's used by this Executor save their raw stdout/stderr logs during Exec.RunInfo SaveLogsToRepo bool // contains filtered or unexported fields }
Executor is a small management layer on top of exec. It implements reflow.Executor. Executor assumes that it has local access to the file system (perhaps with a prefix).
Executor stores its state to disk and, when recovered, re-instantiates all execs (which in turn recover).
func (*Executor) Get ¶
Get returns the exec named ID, or an errors.NotExist if the exec does not exist.
func (*Executor) Kill ¶
Kill disposes of the executors and all of its execs. It also sets the executor's "dead" flag, so that all future operations on the executor returns an error.
func (*Executor) Load ¶
func (e *Executor) Load(ctx context.Context, repo *url.URL, fs reflow.Fileset) (reflow.Fileset, error)
Load loads the fileset into the executor repository. If the fileset is resolved, it is loaded from the specified backing repository. Else the file is loaded from its source.
func (*Executor) Put ¶
func (e *Executor) Put(ctx context.Context, id digest.Digest, cfg reflow.ExecConfig) (reflow.Exec, error)
Put idempotently defines a new exec with a given ID and config. The exec may be (deterministically) rewritten.
func (*Executor) Repository ¶
func (e *Executor) Repository() reflow.Repository
Repository returns the repository attached to this executor.
func (*Executor) SetResources ¶
SetResources sets the resources reported by Resources() to r.
func (*Executor) Start ¶
Start initializes the executor and recovers previously stored state. It re-initializes all stored execs.
type Manifest ¶
type Manifest struct { Type execType State execState PID int Created time.Time Result reflow.Result Config reflow.ExecConfig // The object config used to create this object. Docker types.ContainerJSON // Docker inspect output. Resources reflow.Resources Stats stats Gauges reflow.Gauges }
Manifest stores the state of an exec. It is serialized to JSON and stored on disk so that executors are restartable, and can recover from crashes.
type OomDetector ¶
type OomDetector interface { // Oom returns whether an OOM occurred for the given process ID within the given time range, // and a string with an explanation of why (if true) an OOM occurrence determination was made. // If pid is unspecified (ie, -1), then implementations can make a "possible OOM" determination. Oom(pid int, start, end time.Time) (bool, string) }
OomDetector detects if an OOM has occurred.
type Pool ¶
type Pool struct { pool.ResourcePool // Dir is the filesystem root of the pool. Everything under this // path is assumed to be owned and managed by the pool. Dir string // Prefix is prepended to paths constructed by allocs. This is to // permit running the pool manager inside of a Docker container. Prefix string // Client is the Docker client. We assume that the Docker daemon // runs on the same host from which the pool is managed. Client *docker.Client // Authenticator is used to authenticate ECR image pulls. Authenticator interface { Authenticates(ctx context.Context, image string) (bool, error) Authenticate(ctx context.Context, cfg *types.AuthConfig) error } // AWSCreds is a credentials provider used to mint AWS credentials. // They are used to access AWS services. AWSCreds *credentials.Credentials // Session is the AWS session to use for AWS API calls. Session *session.Session // Blob is the blob store implementation used to fetch data from interns. Blob blob.Mux // TaskDBPoolId is the identifier of this Pool in TaskDB TaskDBPoolId reflow.StringDigest TaskDB taskdb.TaskDB // Log Log *log.Logger HardMemLimit bool // NodeOomDetector is an oom detector based node metrics NodeOomDetector OomDetector // IntegrityErrSignal is a channel for signaling an integrity issue with // the EC2 instance's EBS volume(s). The signal is sent by any of the // Pool's Executors as a result of a file integrity error. IntegrityErrSignal chan struct{} }
Pool implements a resource pool on top of a Docker client. The pool itself must run on the same machine as the Docker instance as it performs local filesystem operations that must be reflected inside the container.
Pool keeps all state on disk, as follows:
Prefix/Dir/state.json Stores the set of currently active allocs, together with their resource requirements. Prefix/Dir/allocs/<id>/ The root directory for the alloc with id. The state under this directory is managed by an executor instance.
func (*Pool) MaintainTaskDBRow ¶
MaintainTaskDBRow maintains the taskdb row corresponding to this pool (if applicable). MaintainTaskDBRow blocks until the given context is done, if this pool has a taskdb implementation and a PoolID set, Otherwise it'll return immediately. The taskdb row is expected to already exist, and this will simply update the Resources and maintains keepalive until ctx cancellation; and then it updates the End time of the row. MaintainTaskDBRow will panic if called on a Pool with no resources (ie, the pool must've been started)
func (*Pool) New ¶
func (p *Pool) New(ctx context.Context, id string, meta pool.AllocMeta, keepalive time.Duration, existing []pool.Alloc) (pool.Alloc, error)
New implements `pool.AllocManager`. New creates a new alloc with the given id, alloc meta and initial keepalive. The list of other existing allocs are provided here to enable atomic saving of the state of all allocs.
func (*Pool) Start ¶
Start starts the pool. If the pool has a state snapshot, Start will restore the pool's previous state. Start will also make sure that all zombie allocs are collected. expectedUsableMemBytes is the expected memory on this pool, and if the available memory is less than this, then the pool returns an error. expectedUsableMemBytes can be set to a small number (eg: zero) to signify that there's no specific expectation.