Documentation ¶
Overview ¶
Package worker will eventually evolve to becoming a concrete implementation of a runtime As such, Concourse core shouldn't depend on abstractions defined in this package or its child packages General Runtime abstractions will be ported over to the Runtime package The Client interface is the main interface that is consumed by Concourse core that will be shifted to the Runtime package
Index ¶
- Constants
- Variables
- func NewClient(pool Pool, provider WorkerProvider, compression compression.Compression, ...) *client
- func NewContainerPlacementStrategy(opts ContainerPlacementStrategyOptions) (*containerPlacementStrategy, error)
- type ArtifactDestination
- type ArtifactSource
- type BindMountSource
- type CertsVolumeMount
- type CheckResult
- type Client
- type Container
- type ContainerLimits
- type ContainerPlacementStrategy
- type ContainerPlacementStrategyChainNode
- type ContainerPlacementStrategyOptions
- type ContainerSpec
- type ErrCreatedVolumeNotFound
- type FetchSource
- type FetchSourceFactory
- type FetchedImage
- type Fetcher
- type FewestBuildContainersPlacementStrategyNode
- type GetResult
- type Image
- type ImageFactory
- type ImageMetadata
- type ImageSpec
- type InputSource
- type LimitActiveTasksPlacementStrategyNode
- type NoCompatibleWorkersError
- type OutputPaths
- type Pool
- type PutResult
- type StreamableArtifactSource
- type TaskResult
- type Volume
- type VolumeClient
- type VolumeLocalityPlacementStrategyNode
- type VolumeMount
- type VolumeProperties
- type VolumeSpec
- type Worker
- type WorkerProvider
- type WorkerSpec
Constants ¶
const GetResourceLockInterval = 5 * time.Second
Variables ¶
var ( ErrNoWorkers = errors.New("no workers") ErrFailedAcquirePoolLock = errors.New("failed to acquire pool lock") )
var ErrBaseResourceTypeNotFound = errors.New("base resource type not found")
var ErrFailedToGetLock = errors.New("failed to get lock")
var ErrMissingVolume = errors.New("volume mounted to container is missing")
var GardenLimitDefault = uint64(0)
var ResourceConfigCheckSessionExpiredError = errors.New("no db container was found for owner")
Functions ¶
func NewClient ¶
func NewClient(pool Pool, provider WorkerProvider, compression compression.Compression, workerPollingInterval time.Duration, workerStatusPublishInterval time.Duration, enabledP2pStreaming bool, p2pStreamingTimeout time.Duration, ) *client
func NewContainerPlacementStrategy ¶
func NewContainerPlacementStrategy(opts ContainerPlacementStrategyOptions) (*containerPlacementStrategy, error)
Types ¶
type ArtifactDestination ¶
type ArtifactDestination interface { // StreamIn is called with a destination directory and the tar stream to // expand into the destination directory. StreamIn(context.Context, string, baggageclaim.Encoding, io.Reader) error GetStreamInP2pUrl(ctx context.Context, path string) (string, error) }
Destination is the inverse of Source. This interface allows the receiving end to determine the location of the data, e.g. based on a task's input configuration.
type ArtifactSource ¶
type ArtifactSource interface { // ExistsOn attempts to locate a volume equivalent to this source on the // given worker. If a volume can be found, it will be used directly. If not, // `StreamTo` will be used to copy the data to the destination instead. ExistsOn(lager.Logger, Worker) (Volume, bool, error) }
func NewCacheArtifactSource ¶
func NewCacheArtifactSource(artifact runtime.CacheArtifact) ArtifactSource
type BindMountSource ¶
type CertsVolumeMount ¶
type CheckResult ¶
type Client ¶
type Client interface { FindContainer(logger lager.Logger, teamID int, handle string) (Container, bool, error) FindVolume(logger lager.Logger, teamID int, handle string) (Volume, bool, error) CreateVolume(logger lager.Logger, vSpec VolumeSpec, wSpec WorkerSpec, volumeType db.VolumeType) (Volume, error) StreamFileFromArtifact( ctx context.Context, logger lager.Logger, artifact runtime.Artifact, filePath string, ) (io.ReadCloser, error) RunCheckStep( context.Context, lager.Logger, db.ContainerOwner, ContainerSpec, WorkerSpec, ContainerPlacementStrategy, db.ContainerMetadata, runtime.ProcessSpec, runtime.StartingEventDelegate, resource.Resource, time.Duration, ) (CheckResult, error) RunTaskStep( context.Context, lager.Logger, db.ContainerOwner, ContainerSpec, WorkerSpec, ContainerPlacementStrategy, db.ContainerMetadata, runtime.ProcessSpec, runtime.StartingEventDelegate, lock.LockFactory, ) (TaskResult, error) RunPutStep( context.Context, lager.Logger, db.ContainerOwner, ContainerSpec, WorkerSpec, ContainerPlacementStrategy, db.ContainerMetadata, runtime.ProcessSpec, runtime.StartingEventDelegate, resource.Resource, ) (PutResult, error) RunGetStep( context.Context, lager.Logger, db.ContainerOwner, ContainerSpec, WorkerSpec, ContainerPlacementStrategy, db.ContainerMetadata, runtime.ProcessSpec, runtime.StartingEventDelegate, db.UsedResourceCache, resource.Resource, ) (GetResult, error) }
type ContainerLimits ¶
func (ContainerLimits) ToGardenLimits ¶
func (cl ContainerLimits) ToGardenLimits() garden.Limits
type ContainerPlacementStrategy ¶
type ContainerPlacementStrategy interface { //TODO: Don't pass around container metadata since it's not guaranteed to be deterministic. // Change this after check containers stop being reused Choose(lager.Logger, []Worker, ContainerSpec) (Worker, error) ModifiesActiveTasks() bool }
func NewRandomPlacementStrategy ¶
func NewRandomPlacementStrategy() ContainerPlacementStrategy
type ContainerSpec ¶
type ContainerSpec struct { TeamID int ImageSpec ImageSpec Env []string Type db.ContainerType // Working directory for processes run in the container. Dir string // artifacts configured as usable. The key reps the mount path of the input artifact // and value is the artifact itself ArtifactByPath map[string]runtime.Artifact // Inputs to provide to the container. Inputs with a volume local to the // selected worker will be made available via a COW volume; others will be // streamed. Inputs []InputSource // Outputs for which volumes should be created and mounted into the container. Outputs OutputPaths // Resource limits to be set on the container when creating in garden. Limits ContainerLimits // Local volumes to bind mount directly to the container when creating in garden. BindMounts []BindMountSource // Optional user to run processes as. Overwrites the one specified in the docker image. User string }
func (*ContainerSpec) Get ¶
func (cs *ContainerSpec) Get(key string) string
func (*ContainerSpec) Set ¶
func (cs *ContainerSpec) Set(key string, value string)
type ErrCreatedVolumeNotFound ¶
func (ErrCreatedVolumeNotFound) Error ¶
func (e ErrCreatedVolumeNotFound) Error() string
type FetchSource ¶
type FetchSourceFactory ¶
type FetchSourceFactory interface { NewFetchSource( logger lager.Logger, worker Worker, owner db.ContainerOwner, cache db.UsedResourceCache, resource resource.Resource, containerSpec ContainerSpec, processSpec runtime.ProcessSpec, containerMetadata db.ContainerMetadata, ) FetchSource }
func NewFetchSourceFactory ¶
func NewFetchSourceFactory( resourceCacheFactory db.ResourceCacheFactory, ) FetchSourceFactory
type FetchedImage ¶
type FetchedImage struct { Metadata ImageMetadata Version atc.Version URL string Privileged bool }
type Fetcher ¶
type Fetcher interface { Fetch( ctx context.Context, logger lager.Logger, containerMetadata db.ContainerMetadata, gardenWorker Worker, containerSpec ContainerSpec, processSpec runtime.ProcessSpec, resource resource.Resource, owner db.ContainerOwner, cache db.UsedResourceCache, lockName string, ) (GetResult, Volume, error) }
func NewFetcher ¶
func NewFetcher( clock clock.Clock, lockFactory lock.LockFactory, fetchSourceFactory FetchSourceFactory, ) Fetcher
type FewestBuildContainersPlacementStrategyNode ¶
type FewestBuildContainersPlacementStrategyNode struct{}
func (*FewestBuildContainersPlacementStrategyNode) Choose ¶
func (strategy *FewestBuildContainersPlacementStrategyNode) Choose(logger lager.Logger, workers []Worker, spec ContainerSpec) ([]Worker, error)
func (*FewestBuildContainersPlacementStrategyNode) ModifiesActiveTasks ¶
func (strategy *FewestBuildContainersPlacementStrategyNode) ModifiesActiveTasks() bool
type GetResult ¶
type GetResult struct { ExitStatus int VersionResult runtime.VersionResult GetArtifact runtime.GetArtifact }
type Image ¶
type Image interface { FetchForContainer( ctx context.Context, logger lager.Logger, container db.CreatingContainer, ) (FetchedImage, error) }
type ImageFactory ¶
type ImageMetadata ¶
type InputSource ¶
type InputSource interface { Source() ArtifactSource DestinationPath() string }
type LimitActiveTasksPlacementStrategyNode ¶
type LimitActiveTasksPlacementStrategyNode struct {
// contains filtered or unexported fields
}
func (*LimitActiveTasksPlacementStrategyNode) Choose ¶
func (strategy *LimitActiveTasksPlacementStrategyNode) Choose(logger lager.Logger, workers []Worker, spec ContainerSpec) ([]Worker, error)
func (*LimitActiveTasksPlacementStrategyNode) ModifiesActiveTasks ¶
func (strategy *LimitActiveTasksPlacementStrategyNode) ModifiesActiveTasks() bool
type NoCompatibleWorkersError ¶
type NoCompatibleWorkersError struct {
Spec WorkerSpec
}
func (NoCompatibleWorkersError) Error ¶
func (err NoCompatibleWorkersError) Error() string
type OutputPaths ¶
OutputPaths is a mapping from output name to its path in the container.
type Pool ¶
type Pool interface { FindOrChooseWorker( lager.Logger, WorkerSpec, ) (Worker, error) ContainerInWorker( lager.Logger, db.ContainerOwner, WorkerSpec, ) (bool, error) FindOrChooseWorkerForContainer( context.Context, lager.Logger, db.ContainerOwner, ContainerSpec, WorkerSpec, ContainerPlacementStrategy, ) (Worker, error) }
func NewPool ¶
func NewPool( provider WorkerProvider, ) Pool
type PutResult ¶
type PutResult struct { ExitStatus int VersionResult runtime.VersionResult }
type StreamableArtifactSource ¶
type StreamableArtifactSource interface { ArtifactSource // StreamTo copies the data from the source to the destination. Note that // this potentially uses a lot of network transfer, for larger artifacts, as // the ATC will effectively act as a middleman. StreamTo(context.Context, ArtifactDestination) error // StreamFile returns the contents of a single file in the artifact source. // This is used for loading a task's configuration at runtime. StreamFile(context.Context, string) (io.ReadCloser, error) }
Source represents data produced by the steps, that can be transferred to other steps.
func NewStreamableArtifactSource ¶
func NewStreamableArtifactSource( artifact runtime.Artifact, volume Volume, compression compression.Compression, enabledP2pStreaming bool, p2pStreamingTimeout time.Duration, ) StreamableArtifactSource
type TaskResult ¶
type TaskResult struct { ExitStatus int VolumeMounts []VolumeMount }
type Volume ¶
type Volume interface { Handle() string Path() string SetProperty(key string, value string) error Properties() (baggageclaim.VolumeProperties, error) SetPrivileged(bool) error StreamIn(ctx context.Context, path string, encoding baggageclaim.Encoding, tarStream io.Reader) error StreamOut(ctx context.Context, path string, encoding baggageclaim.Encoding) (io.ReadCloser, error) GetStreamInP2pUrl(ctx context.Context, path string) (string, error) StreamP2pOut(ctx context.Context, path string, destUrl string, encoding baggageclaim.Encoding) error COWStrategy() baggageclaim.COWStrategy InitializeResourceCache(db.UsedResourceCache) error GetResourceCacheID() int InitializeTaskCache(logger lager.Logger, jobID int, stepName string, path string, privileged bool) error InitializeArtifact(name string, buildID int) (db.WorkerArtifact, error) CreateChildForContainer(db.CreatingContainer, string) (db.CreatingVolume, error) WorkerName() string Destroy() error }
func NewVolume ¶
func NewVolume( bcVolume baggageclaim.Volume, dbVolume db.CreatedVolume, volumeClient VolumeClient, ) Volume
type VolumeClient ¶
type VolumeClient interface { FindOrCreateVolumeForContainer( lager.Logger, VolumeSpec, db.CreatingContainer, int, string, ) (Volume, error) FindOrCreateCOWVolumeForContainer( lager.Logger, VolumeSpec, db.CreatingContainer, Volume, int, string, ) (Volume, error) FindOrCreateVolumeForBaseResourceType( lager.Logger, VolumeSpec, int, string, ) (Volume, error) CreateVolume( lager.Logger, VolumeSpec, int, string, db.VolumeType, ) (Volume, error) FindVolumeForResourceCache( lager.Logger, db.UsedResourceCache, ) (Volume, bool, error) FindVolumeForTaskCache( logger lager.Logger, teamID int, jobID int, stepName string, path string, ) (Volume, bool, error) CreateVolumeForTaskCache( logger lager.Logger, volumeSpec VolumeSpec, teamID int, jobID int, stepName string, path string, ) (Volume, error) FindOrCreateVolumeForResourceCerts( logger lager.Logger, ) (volume Volume, found bool, err error) LookupVolume(lager.Logger, string) (Volume, bool, error) }
func NewVolumeClient ¶
func NewVolumeClient( baggageclaimClient baggageclaim.Client, dbWorker db.Worker, clock clock.Clock, lockFactory lock.LockFactory, dbVolumeRepository db.VolumeRepository, dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory, dbTaskCacheFactory db.TaskCacheFactory, dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory, ) VolumeClient
type VolumeLocalityPlacementStrategyNode ¶
type VolumeLocalityPlacementStrategyNode struct{}
func (*VolumeLocalityPlacementStrategyNode) Choose ¶
func (strategy *VolumeLocalityPlacementStrategyNode) Choose(logger lager.Logger, workers []Worker, spec ContainerSpec) ([]Worker, error)
func (*VolumeLocalityPlacementStrategyNode) ModifiesActiveTasks ¶
func (strategy *VolumeLocalityPlacementStrategyNode) ModifiesActiveTasks() bool
type VolumeMount ¶
type VolumeProperties ¶
type VolumeSpec ¶
type VolumeSpec struct { Strategy baggageclaim.Strategy Properties VolumeProperties Privileged bool TTL time.Duration }
type Worker ¶
type Worker interface { BuildContainers() int Description() string Name() string ResourceTypes() []atc.WorkerResourceType Tags() atc.Tags Uptime() time.Duration IsOwnedByTeam() bool Ephemeral() bool IsVersionCompatible(lager.Logger, version.Version) bool Satisfies(lager.Logger, WorkerSpec) bool FindContainerByHandle(lager.Logger, int, string) (Container, bool, error) FindOrCreateContainer( context.Context, lager.Logger, db.ContainerOwner, db.ContainerMetadata, ContainerSpec, ) (Container, error) FindVolumeForResourceCache(logger lager.Logger, resourceCache db.UsedResourceCache) (Volume, bool, error) FindResourceCacheForVolume(volume Volume) (db.UsedResourceCache, bool, error) FindVolumeForTaskCache(lager.Logger, int, int, string, string) (Volume, bool, error) Fetch( context.Context, lager.Logger, db.ContainerMetadata, Worker, ContainerSpec, runtime.ProcessSpec, resource.Resource, db.ContainerOwner, db.UsedResourceCache, string, ) (GetResult, Volume, error) CertsVolume(lager.Logger) (volume Volume, found bool, err error) LookupVolume(lager.Logger, string) (Volume, bool, error) CreateVolume(logger lager.Logger, spec VolumeSpec, teamID int, volumeType db.VolumeType) (Volume, error) GardenClient() gclient.Client ActiveTasks() (int, error) IncreaseActiveTasks() error DecreaseActiveTasks() error }
func NewGardenWorker ¶
func NewGardenWorker( gardenClient gclient.Client, volumeRepository db.VolumeRepository, volumeClient VolumeClient, imageFactory ImageFactory, fetcher Fetcher, dbTeamFactory db.TeamFactory, dbWorker db.Worker, resourceCacheFactory db.ResourceCacheFactory, numBuildContainers int, ) Worker
NewGardenWorker constructs a Worker using the gardenWorker runtime implementation and allows container and volume creation on a specific Garden worker. A Garden Worker is comprised of: db.Worker, garden Client, container provider, and a volume client
type WorkerProvider ¶
type WorkerProvider interface { RunningWorkers(lager.Logger) ([]Worker, error) FindWorkerForContainer( logger lager.Logger, teamID int, handle string, ) (Worker, bool, error) FindWorkerForVolume( logger lager.Logger, teamID int, handle string, ) (Worker, bool, error) FindWorkersForContainerByOwner( logger lager.Logger, owner db.ContainerOwner, ) ([]Worker, error) NewGardenWorker( logger lager.Logger, savedWorker db.Worker, numBuildWorkers int, ) Worker }
func NewDBWorkerProvider ¶
func NewDBWorkerProvider( lockFactory lock.LockFactory, retryBackOffFactory retryhttp.BackOffFactory, fetcher Fetcher, imageFactory ImageFactory, dbResourceCacheFactory db.ResourceCacheFactory, dbResourceConfigFactory db.ResourceConfigFactory, dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory, dbTaskCacheFactory db.TaskCacheFactory, dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory, dbVolumeRepository db.VolumeRepository, dbTeamFactory db.TeamFactory, workerFactory db.WorkerFactory, workerVersion version.Version, baggageclaimResponseHeaderTimeout, gardenRequestTimeout time.Duration, ) WorkerProvider
type WorkerSpec ¶
func (WorkerSpec) Description ¶
func (spec WorkerSpec) Description() string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
connection/connectionfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
gclientfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
transportfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
Code generated by counterfeiter.
|
Code generated by counterfeiter. |