worker

package
v0.0.0-...-de86ced Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2017 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const ImageMetadataFile = "metadata.json"
View Source
const RawRootFSScheme = "raw"

Variables

View Source
var (
	ErrNoWorkers     = errors.New("no workers")
	ErrMissingWorker = errors.New("worker for container is missing")
)
View Source
var ErrBaseResourceTypeNotFound = errors.New("base-resource-type-not-found")
View Source
var ErrCreatedVolumeNotFound = errors.New("failed-to-find-created-volume-in-baggageclaim")
View Source
var ErrDesiredWorkerNotRunning = errors.New("desired garden worker is not known to be running")
View Source
var ErrIncompatiblePlatform = errors.New("incompatible platform")
View Source
var ErrMismatchedTags = errors.New("mismatched tags")
View Source
var ErrMissingVolume = errors.New("volume mounted to container is missing")
View Source
var ErrNoVolumeManager = errors.New("worker does not support volume management")
View Source
var ErrNotImplemented = errors.New("Not implemented")
View Source
var ErrTeamMismatch = errors.New("mismatched team")
View Source
var ErrUnsupportedResourceType = errors.New("unsupported resource type")
View Source
var ErrVolumeExpiredImmediately = errors.New("volume expired immediately after saving")

Functions

func NewHardcoded

func NewHardcoded(
	logger lager.Logger,
	workerFactory dbng.WorkerFactory,
	clock c.Clock,
	gardenAddr string,
	baggageclaimURL string,
	resourceTypes []atc.WorkerResourceType,
) ifrit.RunFunc

Types

type ArtifactDestination

type ArtifactDestination interface {
	// StreamIn is called with a destination directory and the tar stream to
	// expand into the destination directory.
	StreamIn(string, io.Reader) error
}

Destination is the inverse of Source. This interface allows the receiving end to determine the location of the data, e.g. based on a task's input configuration.

type ArtifactName

type ArtifactName string

ArtifactName is just a string, with its own type to make interfaces using it more self-documenting.

type ArtifactRepository

type ArtifactRepository struct {
	// contains filtered or unexported fields
}

ArtifactRepository is the mapping from a ArtifactName to an ArtifactSource. Steps will both populate this map with new artifacts (e.g. the resource fetched by a Get step), and look up required artifacts (e.g. the inputs configured for a Task step).

There is only one ArtifactRepository for the duration of a build plan's execution.

ArtifactRepository is, itself, an ArtifactSource. As an ArtifactSource it acts as the set of all ArtifactSources it contains, as if they were each in subdirectories corresponding to their ArtifactName.

func NewArtifactRepository

func NewArtifactRepository() *ArtifactRepository

NewArtifactRepository constructs a new repository.

func (*ArtifactRepository) AsMap

AsMap extracts the current contents of the ArtifactRepository into a new map and returns it. Changes to the returned map or the ArtifactRepository will not affect each other.

func (*ArtifactRepository) RegisterSource

func (repo *ArtifactRepository) RegisterSource(name ArtifactName, source ArtifactSource)

RegisterSource inserts an ArtifactSource into the map under the given ArtifactName. Producers of artifacts, e.g. the Get step and the Task step, will call this after they've successfully produced their artifact(s).

func (*ArtifactRepository) ScopedTo

func (repo *ArtifactRepository) ScopedTo(names ...ArtifactName) (*ArtifactRepository, error)

ScopedTo returns a new ArtifactRepository restricted to the given set of ArtifactNames. This is used by the Put step to stream in the sources that did not have a volume available on its destination.

func (*ArtifactRepository) SourceFor

func (repo *ArtifactRepository) SourceFor(name ArtifactName) (ArtifactSource, bool)

SourceFor looks up an Source for the given ArtifactName. Consumers of artifacts, e.g. the Task step, will call this to locate their dependencies.

func (*ArtifactRepository) StreamFile

func (repo *ArtifactRepository) StreamFile(path string) (io.ReadCloser, error)

StreamFile streams a single file out of the repository, using the first path segment to determine the ArtifactSource to stream out of. For example, StreamFile("a/b.yml") will look up the "a" ArtifactSource and return the result of StreamFile("b.yml") on it.

If the ArtifactSource determined by the path is not present, FileNotFoundError will be returned.

func (*ArtifactRepository) StreamTo

func (repo *ArtifactRepository) StreamTo(dest ArtifactDestination) error

StreamTo will stream all currently registered artifacts to the destination. This is used by the Put step, which currently does not have an explicit set of dependencies, and instead just pulls in everything.

Each ArtifactSource will be streamed to a subdirectory matching its ArtifactName.

func (*ArtifactRepository) VolumeOn

func (repo *ArtifactRepository) VolumeOn(worker Worker) (Volume, bool, error)

VolumeOn returns nothing, as it's impossible for there to be a single volume representing all ArtifactSources.

type ArtifactSource

type ArtifactSource interface {
	// StreamTo copies the data from the source to the destination. Note that
	// this potentially uses a lot of network transfer, for larger artifacts, as
	// the ATC will effectively act as a middleman.
	StreamTo(ArtifactDestination) error

	// StreamFile returns the contents of a single file in the artifact source.
	// This is used for loading a task's configuration at runtime.
	//
	// If the file cannot be found, FileNotFoundError should be returned.
	StreamFile(path string) (io.ReadCloser, error)

	// VolumeOn attempts to locate a volume equivalent to this source on the
	// given worker. If a volume can be found, it will be used directly. If not,
	// `StreamTo` will be used to copy the data to the destination instead.
	VolumeOn(Worker) (Volume, bool, error)
}

Source represents data produced by the steps, that can be transferred to other steps.

type Client

type Client interface {
	FindOrCreateBuildContainer(
		lager.Logger,
		<-chan os.Signal,
		ImageFetchingDelegate,
		int,
		atc.PlanID,
		dbng.ContainerMetadata,
		ContainerSpec,
		atc.VersionedResourceTypes,
	) (Container, error)

	CreateResourceGetContainer(
		logger lager.Logger,
		resourceUser dbng.ResourceUser,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		metadata dbng.ContainerMetadata,
		spec ContainerSpec,
		resourceTypes atc.VersionedResourceTypes,
		resourceType string,
		version atc.Version,
		source atc.Source,
		params atc.Params,
	) (Container, error)

	FindOrCreateResourceCheckContainer(
		logger lager.Logger,
		resourceUser dbng.ResourceUser,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		metadata dbng.ContainerMetadata,
		spec ContainerSpec,
		resourceTypes atc.VersionedResourceTypes,
		resourceType string,
		source atc.Source,
	) (Container, error)

	CreateVolumeForResourceCache(
		logger lager.Logger,
		vs VolumeSpec,
		resourceCache *dbng.UsedResourceCache,
	) (Volume, error)

	FindInitializedVolumeForResourceCache(
		logger lager.Logger,
		resourceCache *dbng.UsedResourceCache,
	) (Volume, bool, error)

	FindContainerByHandle(lager.Logger, int, string) (Container, bool, error)
	FindResourceTypeByPath(path string) (atc.WorkerResourceType, bool)
	LookupVolume(lager.Logger, string) (Volume, bool, error)

	Satisfying(lager.Logger, WorkerSpec, atc.VersionedResourceTypes) (Worker, error)
	AllSatisfying(lager.Logger, WorkerSpec, atc.VersionedResourceTypes) ([]Worker, error)
	RunningWorkers(lager.Logger) ([]Worker, error)
}

func NewPool

func NewPool(provider WorkerProvider) Client

type Container

type Container interface {
	garden.Container

	Destroy() error

	VolumeMounts() []VolumeMount

	WorkerName() string

	MarkAsHijacked() error
}

type ContainerProvider

type ContainerProvider interface {
	FindCreatedContainerByHandle(
		logger lager.Logger,
		handle string,
		teamID int,
	) (Container, bool, error)

	FindOrCreateBuildContainer(
		logger lager.Logger,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		buildID int,
		planID atc.PlanID,
		metadata dbng.ContainerMetadata,
		spec ContainerSpec,
		resourceTypes atc.VersionedResourceTypes,
	) (Container, error)

	FindOrCreateResourceCheckContainer(
		logger lager.Logger,
		resourceUser dbng.ResourceUser,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		metadata dbng.ContainerMetadata,
		spec ContainerSpec,
		resourceTypes atc.VersionedResourceTypes,
		resourceType string,
		source atc.Source,
	) (Container, error)

	CreateResourceGetContainer(
		logger lager.Logger,
		resourceUser dbng.ResourceUser,
		cancel <-chan os.Signal,
		delegate ImageFetchingDelegate,
		metadata dbng.ContainerMetadata,
		spec ContainerSpec,
		resourceTypes atc.VersionedResourceTypes,
		resourceTypeName string,
		version atc.Version,
		source atc.Source,
		params atc.Params,
	) (Container, error)
}

type ContainerProviderFactory

type ContainerProviderFactory interface {
	ContainerProviderFor(Worker) ContainerProvider
}

func NewContainerProviderFactory

func NewContainerProviderFactory(
	gardenClient garden.Client,
	baggageclaimClient baggageclaim.Client,
	volumeClient VolumeClient,
	imageFactory ImageFactory,
	dbVolumeFactory dbng.VolumeFactory,
	dbResourceCacheFactory dbng.ResourceCacheFactory,
	dbResourceConfigFactory dbng.ResourceConfigFactory,
	dbTeamFactory dbng.TeamFactory,
	lockDB LockDB,
	httpProxyURL string,
	httpsProxyURL string,
	noProxy string,
	clock clock.Clock,
) ContainerProviderFactory

type ContainerSpec

type ContainerSpec struct {
	Platform  string
	Tags      []string
	TeamID    int
	ImageSpec ImageSpec
	Env       []string

	// Working directory for processes run in the container.
	Dir string

	// Inputs to provide to the container. Inputs with a volume local to the
	// selected worker will be made available via a COW volume; others will be
	// streamed.
	Inputs []InputSource

	// Outputs for which volumes should be created and mounted into the container.
	Outputs OutputPaths

	// A pre-created resource cache volume to be mounted into the container.
	ResourceCache *VolumeMount

	// Optional user to run processes as. Overwrites the one specified in the docker image.
	User string
}

func (ContainerSpec) WorkerSpec

func (spec ContainerSpec) WorkerSpec() WorkerSpec

type FetchedImage

type FetchedImage struct {
	Metadata   ImageMetadata
	Version    atc.Version
	URL        string
	Privileged bool
}

type FileNotFoundError

type FileNotFoundError struct {
	Path string
}

FileNotFoundError is the error to return from StreamFile when the given path does not exist.

func (FileNotFoundError) Error

func (err FileNotFoundError) Error() string

Error prints a helpful message including the file path. The user will see this message if e.g. their task config path does not exist.

type GardenConnectionFactory

type GardenConnectionFactory interface {
	BuildConnection() gconn.Connection
}

func NewGardenConnectionFactory

func NewGardenConnectionFactory(
	db transport.TransportDB,
	logger lager.Logger,
	workerName string,
	workerHost *string,
	retryBackOffFactory retryhttp.BackOffFactory,
) GardenConnectionFactory

type HostRootFSStrategy

type HostRootFSStrategy struct {
	Path       string
	WorkerName string
	Version    *string
}

type Image

type Image interface {
	FetchForContainer(
		logger lager.Logger,
		container dbng.CreatingContainer,
	) (FetchedImage, error)
}

type ImageFactory

type ImageFactory interface {
	GetImage(
		logger lager.Logger,
		workerClient Worker,
		volumeClient VolumeClient,
		imageSpec ImageSpec,
		teamID int,
		signals <-chan os.Signal,
		delegate ImageFetchingDelegate,
		resourceUser dbng.ResourceUser,
		resourceTypes atc.VersionedResourceTypes,
	) (Image, error)
}

type ImageFetchingDelegate

type ImageFetchingDelegate interface {
	Stderr() io.Writer
	ImageVersionDetermined(ResourceCacheIdentifier) error
}

type ImageMetadata

type ImageMetadata struct {
	Env  []string `json:"env"`
	User string   `json:"user"`
}

type ImageSpec

type ImageSpec struct {
	ResourceType        string
	ImageURL            string
	ImageResource       *atc.ImageResource
	ImageArtifactSource ArtifactSource
	ImageArtifactName   ArtifactName
	Privileged          bool
}

type InputSource

type InputSource interface {
	Name() ArtifactName
	Source() ArtifactSource
	DestinationPath() string
}

type LockDB

type LockDB interface {
	AcquireVolumeCreatingLock(lager.Logger, int) (lock.Lock, bool, error)
	AcquireContainerCreatingLock(lager.Logger, int) (lock.Lock, bool, error)
}

type MalformedMetadataError

type MalformedMetadataError struct {
	UnmarshalError error
}

func (MalformedMetadataError) Error

func (err MalformedMetadataError) Error() string

type NoCompatibleWorkersError

type NoCompatibleWorkersError struct {
	Spec    WorkerSpec
	Workers []Worker
}

func (NoCompatibleWorkersError) Error

func (err NoCompatibleWorkersError) Error() string

type NoopImageFetchingDelegate

type NoopImageFetchingDelegate struct{}

func (NoopImageFetchingDelegate) ImageVersionDetermined

func (NoopImageFetchingDelegate) Stderr

type OutputPaths

type OutputPaths map[string]string

OutputPaths is a mapping from output name to its path in the container.

type ResourceCacheIdentifier

type ResourceCacheIdentifier db.ResourceCacheIdentifier

type RetryableConnection

type RetryableConnection struct {
	gconn.Connection
}

func NewRetryableConnection

func NewRetryableConnection(connection gconn.Connection) *RetryableConnection

func (*RetryableConnection) Attach

func (conn *RetryableConnection) Attach(handle string, processID string, processIO garden.ProcessIO) (garden.Process, error)

func (*RetryableConnection) Run

func (conn *RetryableConnection) Run(handle string, processSpec garden.ProcessSpec, processIO garden.ProcessIO) (garden.Process, error)

type Sleeper

type Sleeper interface {
	Sleep(time.Duration)
}

type Volume

type Volume interface {
	Handle() string
	Path() string

	SetProperty(key string, value string) error
	Properties() (baggageclaim.VolumeProperties, error)

	SetPrivileged(bool) error

	StreamIn(path string, tarStream io.Reader) error
	StreamOut(path string) (io.ReadCloser, error)

	COWStrategy() baggageclaim.COWStrategy

	IsInitialized() (bool, error)
	Initialize() error

	CreateChildForContainer(dbng.CreatingContainer, string) (dbng.CreatingVolume, error)

	Destroy() error
}

func NewVolume

func NewVolume(
	bcVolume baggageclaim.Volume,
	dbVolume dbng.CreatedVolume,
) Volume

type VolumeClient

type VolumeClient interface {
	CreateVolumeForResourceCache(
		lager.Logger,
		VolumeSpec,
		*dbng.UsedResourceCache,
	) (Volume, error)
	FindOrCreateVolumeForContainer(
		lager.Logger,
		VolumeSpec,
		dbng.CreatingContainer,
		int,
		string,
	) (Volume, error)
	FindOrCreateCOWVolumeForContainer(
		lager.Logger,
		VolumeSpec,
		dbng.CreatingContainer,
		Volume,
		int,
		string,
	) (Volume, error)
	FindOrCreateVolumeForBaseResourceType(
		lager.Logger,
		VolumeSpec,
		int,
		string,
	) (Volume, error)
	FindInitializedVolumeForResourceCache(
		lager.Logger,
		*dbng.UsedResourceCache,
	) (Volume, bool, error)
	LookupVolume(lager.Logger, string) (Volume, bool, error)
}

func NewVolumeClient

func NewVolumeClient(
	baggageclaimClient baggageclaim.Client,
	lockDB LockDB,
	dbVolumeFactory dbng.VolumeFactory,
	dbWorkerBaseResourceTypeFactory dbng.WorkerBaseResourceTypeFactory,
	clock clock.Clock,
	dbWorker dbng.Worker,
) VolumeClient

type VolumeMount

type VolumeMount struct {
	Volume    Volume
	MountPath string
}

type VolumeProperties

type VolumeProperties map[string]string

type VolumeSpec

type VolumeSpec struct {
	Strategy   baggageclaim.Strategy
	Properties VolumeProperties
	Privileged bool
	TTL        time.Duration
}

type Worker

type Worker interface {
	Client

	ActiveContainers() int

	Description() string
	Name() string
	ResourceTypes() []atc.WorkerResourceType
	Tags() atc.Tags
	Uptime() time.Duration
	IsOwnedByTeam() bool
	IsVersionCompatible(lager.Logger, *version.Version) bool
}

func NewGardenWorker

func NewGardenWorker(
	containerProviderFactory ContainerProviderFactory,
	volumeClient VolumeClient,
	lockDB LockDB,
	provider WorkerProvider,
	clock clock.Clock,
	activeContainers int,
	resourceTypes []atc.WorkerResourceType,
	platform string,
	tags atc.Tags,
	teamID int,
	name string,
	startTime int64,
	version *string,
) Worker

type WorkerProvider

type WorkerProvider interface {
	RunningWorkers(lager.Logger) ([]Worker, error)

	FindWorkerForContainer(
		logger lager.Logger,
		teamID int,
		handle string,
	) (Worker, bool, error)

	FindWorkerForResourceCheckContainer(
		logger lager.Logger,
		teamID int,
		resourceUser dbng.ResourceUser,
		resourceType string,
		resourceSource atc.Source,
		types atc.VersionedResourceTypes,
	) (Worker, bool, error)

	FindWorkerForBuildContainer(
		logger lager.Logger,
		teamID int,
		buildID int,
		planID atc.PlanID,
	) (Worker, bool, error)
}

func NewDBWorkerProvider

func NewDBWorkerProvider(
	lockDB LockDB,
	retryBackOffFactory retryhttp.BackOffFactory,
	imageFactory ImageFactory,
	dbResourceCacheFactory dbng.ResourceCacheFactory,
	dbResourceConfigFactory dbng.ResourceConfigFactory,
	dbWorkerBaseResourceTypeFactory dbng.WorkerBaseResourceTypeFactory,
	dbVolumeFactory dbng.VolumeFactory,
	dbTeamFactory dbng.TeamFactory,
	workerFactory dbng.WorkerFactory,
	workerVersion *version.Version,
) WorkerProvider

type WorkerSpec

type WorkerSpec struct {
	Platform     string
	ResourceType string
	Tags         []string
	TeamID       int
}

func (WorkerSpec) Description

func (spec WorkerSpec) Description() string

Directories

Path Synopsis
imagefakes
Code generated by counterfeiter.
Code generated by counterfeiter.
transportfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL