worker

package
v4.2.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2018 License: Apache-2.0 Imports: 29 Imported by: 461

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBaseResourceTypeNotFound = errors.New("base resource type not found")
View Source
var ErrIncompatiblePlatform = errors.New("incompatible platform")
View Source
var ErrMismatchedTags = errors.New("mismatched tags")
View Source
var ErrMissingVolume = errors.New("volume mounted to container is missing")
View Source
var (
	ErrNoWorkers = errors.New("no workers")
)
View Source
var ErrNotImplemented = errors.New("Not implemented")
View Source
var ErrTeamMismatch = errors.New("mismatched team")
View Source
var ErrUnsupportedResourceType = errors.New("unsupported resource type")
View Source
var GardenLimitDefault = uint64(0)

Functions

func NewHardcoded

func NewHardcoded(
	logger lager.Logger,
	workerFactory db.WorkerFactory,
	clock c.Clock,
	gardenAddr string,
	baggageclaimURL string,
	resourceTypes []atc.WorkerResourceType,
) ifrit.RunFunc

Types

type ArtifactDestination

type ArtifactDestination interface {
	// StreamIn is called with a destination directory and the tar stream to
	// expand into the destination directory.
	StreamIn(string, io.Reader) error
}

Destination is the inverse of Source. This interface allows the receiving end to determine the location of the data, e.g. based on a task's input configuration.

type ArtifactName

type ArtifactName string

ArtifactName is just a string, with its own type to make interfaces using it more self-documenting.

type ArtifactRepository

type ArtifactRepository struct {
	// contains filtered or unexported fields
}

ArtifactRepository is the mapping from a ArtifactName to an ArtifactSource. Steps will both populate this map with new artifacts (e.g. the resource fetched by a Get step), and look up required artifacts (e.g. the inputs configured for a Task step).

There is only one ArtifactRepository for the duration of a build plan's execution.

ArtifactRepository is, itself, an ArtifactSource. As an ArtifactSource it acts as the set of all ArtifactSources it contains, as if they were each in subdirectories corresponding to their ArtifactName.

func NewArtifactRepository

func NewArtifactRepository() *ArtifactRepository

NewArtifactRepository constructs a new repository.

func (*ArtifactRepository) AsMap

AsMap extracts the current contents of the ArtifactRepository into a new map and returns it. Changes to the returned map or the ArtifactRepository will not affect each other.

func (*ArtifactRepository) RegisterSource

func (repo *ArtifactRepository) RegisterSource(name ArtifactName, source ArtifactSource)

RegisterSource inserts an ArtifactSource into the map under the given ArtifactName. Producers of artifacts, e.g. the Get step and the Task step, will call this after they've successfully produced their artifact(s).

func (*ArtifactRepository) SourceFor

func (repo *ArtifactRepository) SourceFor(name ArtifactName) (ArtifactSource, bool)

SourceFor looks up a Source for the given ArtifactName. Consumers of artifacts, e.g. the Task step, will call this to locate their dependencies.

func (*ArtifactRepository) StreamFile

func (repo *ArtifactRepository) StreamFile(path string) (io.ReadCloser, error)

StreamFile streams a single file out of the repository, using the first path segment to determine the ArtifactSource to stream out of. For example, StreamFile("a/b.yml") will look up the "a" ArtifactSource and return the result of StreamFile("b.yml") on it.

If the ArtifactSource determined by the path is not present, FileNotFoundError will be returned.

func (*ArtifactRepository) StreamTo

func (repo *ArtifactRepository) StreamTo(dest ArtifactDestination) error

StreamTo will stream all currently registered artifacts to the destination. This is used by the Put step, which currently does not have an explicit set of dependencies, and instead just pulls in everything.

Each ArtifactSource will be streamed to a subdirectory matching its ArtifactName.

func (*ArtifactRepository) VolumeOn

func (repo *ArtifactRepository) VolumeOn(worker Worker) (Volume, bool, error)

VolumeOn returns nothing, as it's impossible for there to be a single volume representing all ArtifactSources.

type ArtifactSource

type ArtifactSource interface {
	// StreamTo copies the data from the source to the destination. Note that
	// this potentially uses a lot of network transfer, for larger artifacts, as
	// the ATC will effectively act as a middleman.
	StreamTo(ArtifactDestination) error

	// StreamFile returns the contents of a single file in the artifact source.
	// This is used for loading a task's configuration at runtime.
	//
	// If the file cannot be found, FileNotFoundError should be returned.
	StreamFile(path string) (io.ReadCloser, error)

	// VolumeOn attempts to locate a volume equivalent to this source on the
	// given worker. If a volume can be found, it will be used directly. If not,
	// `StreamTo` will be used to copy the data to the destination instead.
	VolumeOn(Worker) (Volume, bool, error)
}

Source represents data produced by the steps, that can be transferred to other steps.

type BindMountSource

type BindMountSource interface {
	VolumeOn(Worker) (garden.BindMount, bool, error)
}

type CertsVolumeMount

type CertsVolumeMount struct {
	Logger lager.Logger
}

func (*CertsVolumeMount) VolumeOn

func (s *CertsVolumeMount) VolumeOn(worker Worker) (garden.BindMount, bool, error)

type Client

type Client interface {
	FindOrCreateContainer(
		context.Context,
		lager.Logger,
		ImageFetchingDelegate,
		db.ContainerOwner,
		db.ContainerMetadata,
		ContainerSpec,
		creds.VersionedResourceTypes,
	) (Container, error)

	FindContainerByHandle(lager.Logger, int, string) (Container, bool, error)

	LookupVolume(lager.Logger, string) (Volume, bool, error)

	FindResourceTypeByPath(path string) (atc.WorkerResourceType, bool)

	Satisfying(lager.Logger, WorkerSpec, creds.VersionedResourceTypes) (Worker, error)
	AllSatisfying(lager.Logger, WorkerSpec, creds.VersionedResourceTypes) ([]Worker, error)
	RunningWorkers(lager.Logger) ([]Worker, error)
}

func NewPool

func NewPool(provider WorkerProvider, strategy ContainerPlacementStrategy) Client

type Container

type Container interface {
	garden.Container

	Destroy() error

	VolumeMounts() []VolumeMount

	WorkerName() string

	MarkAsHijacked() error
}

type ContainerLimits

type ContainerLimits struct {
	CPU    *uint64
	Memory *uint64
}

func (ContainerLimits) ToGardenLimits

func (cl ContainerLimits) ToGardenLimits() garden.Limits

type ContainerPlacementStrategy

type ContainerPlacementStrategy interface {
	Choose([]Worker, ContainerSpec) (Worker, error)
}

func NewRandomPlacementStrategy

func NewRandomPlacementStrategy() ContainerPlacementStrategy

func NewVolumeLocalityPlacementStrategy

func NewVolumeLocalityPlacementStrategy() ContainerPlacementStrategy

type ContainerProvider

type ContainerProvider interface {
	FindCreatedContainerByHandle(
		logger lager.Logger,
		handle string,
		teamID int,
	) (Container, bool, error)

	FindOrCreateContainer(
		ctx context.Context,
		logger lager.Logger,
		owner db.ContainerOwner,
		delegate ImageFetchingDelegate,
		metadata db.ContainerMetadata,
		spec ContainerSpec,
		resourceTypes creds.VersionedResourceTypes,
	) (Container, error)
}

func NewContainerProvider

func NewContainerProvider(
	gardenClient garden.Client,
	baggageclaimClient baggageclaim.Client,
	volumeClient VolumeClient,
	dbWorker db.Worker,
	clock clock.Clock,

	imageFactory ImageFactory,
	dbVolumeRepository db.VolumeRepository,
	dbTeamFactory db.TeamFactory,
	lockFactory lock.LockFactory,
) ContainerProvider

type ContainerSpec

type ContainerSpec struct {
	Platform  string
	Tags      []string
	TeamID    int
	ImageSpec ImageSpec
	Env       []string

	// Working directory for processes run in the container.
	Dir string

	// Inputs to provide to the container. Inputs with a volume local to the
	// selected worker will be made available via a COW volume; others will be
	// streamed.
	Inputs []InputSource

	// Outputs for which volumes should be created and mounted into the container.
	Outputs OutputPaths

	// Resource limits to be set on the container when creating in garden.
	Limits ContainerLimits

	// Local volumes to bind mount directly to the container when creating in garden.
	BindMounts []BindMountSource

	// Optional user to run processes as. Overwrites the one specified in the docker image.
	User string
}

func (ContainerSpec) WorkerSpec

func (spec ContainerSpec) WorkerSpec() WorkerSpec

type ErrCreatedVolumeNotFound

type ErrCreatedVolumeNotFound struct {
	Handle     string
	WorkerName string
}

func (ErrCreatedVolumeNotFound) Error

func (e ErrCreatedVolumeNotFound) Error() string

type FetchedImage

type FetchedImage struct {
	Metadata   ImageMetadata
	Version    atc.Version
	URL        string
	Privileged bool
}

type FileNotFoundError

type FileNotFoundError struct {
	Path string
}

FileNotFoundError is the error to return from StreamFile when the given path does not exist.

func (FileNotFoundError) Error

func (err FileNotFoundError) Error() string

Error prints a helpful message including the file path. The user will see this message if e.g. their task config path does not exist.

type GardenConnectionFactory

type GardenConnectionFactory interface {
	BuildConnection() gconn.Connection
}

func NewGardenConnectionFactory

func NewGardenConnectionFactory(
	db transport.TransportDB,
	logger lager.Logger,
	workerName string,
	workerHost *string,
	retryBackOffFactory retryhttp.BackOffFactory,
) GardenConnectionFactory

type Image

type Image interface {
	FetchForContainer(
		ctx context.Context,
		logger lager.Logger,
		container db.CreatingContainer,
	) (FetchedImage, error)
}

type ImageFactory

type ImageFactory interface {
	GetImage(
		logger lager.Logger,
		workerClient Worker,
		volumeClient VolumeClient,
		imageSpec ImageSpec,
		teamID int,
		delegate ImageFetchingDelegate,
		resourceTypes creds.VersionedResourceTypes,
	) (Image, error)
}

type ImageFetchingDelegate

type ImageFetchingDelegate interface {
	Stdout() io.Writer
	Stderr() io.Writer
	ImageVersionDetermined(db.UsedResourceCache) error
}

type ImageMetadata

type ImageMetadata struct {
	Env  []string `json:"env"`
	User string   `json:"user"`
}

type ImageResource

type ImageResource struct {
	Type    string
	Source  creds.Source
	Params  *atc.Params
	Version *atc.Version
}

type ImageSpec

type ImageSpec struct {
	ResourceType        string
	ImageURL            string
	ImageResource       *ImageResource
	ImageArtifactSource ArtifactSource
	ImageArtifactName   ArtifactName
	Privileged          bool
}

type InputSource

type InputSource interface {
	Source() ArtifactSource
	DestinationPath() string
}

type NoCompatibleWorkersError

type NoCompatibleWorkersError struct {
	Spec    WorkerSpec
	Workers []Worker
}

func (NoCompatibleWorkersError) Error

func (err NoCompatibleWorkersError) Error() string

type NoopImageFetchingDelegate

type NoopImageFetchingDelegate struct{}

func (NoopImageFetchingDelegate) ImageVersionDetermined

func (NoopImageFetchingDelegate) ImageVersionDetermined(db.UsedResourceCache) error

func (NoopImageFetchingDelegate) Stderr

func (NoopImageFetchingDelegate) Stdout

type OutputPaths

type OutputPaths map[string]string

OutputPaths is a mapping from output name to its path in the container.

type RandomPlacementStrategy

type RandomPlacementStrategy struct {
	// contains filtered or unexported fields
}

func (*RandomPlacementStrategy) Choose

func (strategy *RandomPlacementStrategy) Choose(workers []Worker, spec ContainerSpec) (Worker, error)

type RetryableConnection

type RetryableConnection struct {
	gconn.Connection
}

func NewRetryableConnection

func NewRetryableConnection(connection gconn.Connection) *RetryableConnection

func (*RetryableConnection) Attach

func (conn *RetryableConnection) Attach(handle string, processID string, processIO garden.ProcessIO) (garden.Process, error)

func (*RetryableConnection) Run

func (conn *RetryableConnection) Run(handle string, processSpec garden.ProcessSpec, processIO garden.ProcessIO) (garden.Process, error)

type Volume

type Volume interface {
	Handle() string
	Path() string

	SetProperty(key string, value string) error
	Properties() (baggageclaim.VolumeProperties, error)

	SetPrivileged(bool) error

	StreamIn(path string, tarStream io.Reader) error
	StreamOut(path string) (io.ReadCloser, error)

	COWStrategy() baggageclaim.COWStrategy

	InitializeResourceCache(db.UsedResourceCache) error
	InitializeTaskCache(lager.Logger, int, string, string, bool) error

	CreateChildForContainer(db.CreatingContainer, string) (db.CreatingVolume, error)

	Destroy() error
}

func NewVolume

func NewVolume(
	bcVolume baggageclaim.Volume,
	dbVolume db.CreatedVolume,
	volumeClient VolumeClient,
) Volume

type VolumeClient

type VolumeClient interface {
	FindOrCreateVolumeForContainer(
		lager.Logger,
		VolumeSpec,
		db.CreatingContainer,
		int,
		string,
	) (Volume, error)
	FindOrCreateCOWVolumeForContainer(
		lager.Logger,
		VolumeSpec,
		db.CreatingContainer,
		Volume,
		int,
		string,
	) (Volume, error)
	FindOrCreateVolumeForBaseResourceType(
		lager.Logger,
		VolumeSpec,
		int,
		string,
	) (Volume, error)
	FindVolumeForResourceCache(
		lager.Logger,
		db.UsedResourceCache,
	) (Volume, bool, error)
	FindVolumeForTaskCache(
		logger lager.Logger,
		teamID int,
		jobID int,
		stepName string,
		path string,
	) (Volume, bool, error)
	CreateVolumeForTaskCache(
		logger lager.Logger,
		volumeSpec VolumeSpec,
		teamID int,
		jobID int,
		stepName string,
		path string,
	) (Volume, error)
	FindOrCreateVolumeForResourceCerts(
		logger lager.Logger,
	) (volume Volume, found bool, err error)

	LookupVolume(lager.Logger, string) (Volume, bool, error)
}

func NewVolumeClient

func NewVolumeClient(
	baggageclaimClient baggageclaim.Client,
	dbWorker db.Worker,
	clock clock.Clock,

	lockFactory lock.LockFactory,
	dbVolumeRepository db.VolumeRepository,
	dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory,
	dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory,
) VolumeClient

type VolumeLocalityPlacementStrategy

type VolumeLocalityPlacementStrategy struct {
	// contains filtered or unexported fields
}

func (*VolumeLocalityPlacementStrategy) Choose

func (strategy *VolumeLocalityPlacementStrategy) Choose(workers []Worker, spec ContainerSpec) (Worker, error)

type VolumeMount

type VolumeMount struct {
	Volume    Volume
	MountPath string
}

type VolumeProperties

type VolumeProperties map[string]string

type VolumeSpec

type VolumeSpec struct {
	Strategy   baggageclaim.Strategy
	Properties VolumeProperties
	Privileged bool
	TTL        time.Duration
}

type Worker

type Worker interface {
	Client

	ActiveContainers() int

	Description() string
	Name() string
	ResourceTypes() []atc.WorkerResourceType
	Tags() atc.Tags
	Uptime() time.Duration
	IsOwnedByTeam() bool
	Ephemeral() bool
	IsVersionCompatible(lager.Logger, *version.Version) bool

	FindVolumeForResourceCache(logger lager.Logger, resourceCache db.UsedResourceCache) (Volume, bool, error)
	FindVolumeForTaskCache(lager.Logger, int, int, string, string) (Volume, bool, error)

	CertsVolume(lager.Logger) (volume Volume, found bool, err error)
	GardenClient() garden.Client
}

func NewGardenWorker

func NewGardenWorker(
	gardenClient garden.Client,
	baggageclaimClient baggageclaim.Client,
	containerProvider ContainerProvider,
	volumeClient VolumeClient,
	dbWorker db.Worker,
	clock clock.Clock,
) Worker

type WorkerProvider

type WorkerProvider interface {
	RunningWorkers(lager.Logger) ([]Worker, error)

	FindWorkerForContainer(
		logger lager.Logger,
		teamID int,
		handle string,
	) (Worker, bool, error)

	FindWorkerForContainerByOwner(
		logger lager.Logger,
		teamID int,
		owner db.ContainerOwner,
	) (Worker, bool, error)

	NewGardenWorker(
		logger lager.Logger,
		tikTok clock.Clock,
		savedWorker db.Worker,
	) Worker
}

func NewDBWorkerProvider

func NewDBWorkerProvider(
	lockFactory lock.LockFactory,
	retryBackOffFactory retryhttp.BackOffFactory,
	imageFactory ImageFactory,
	dbResourceCacheFactory db.ResourceCacheFactory,
	dbResourceConfigFactory db.ResourceConfigFactory,
	dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory,
	dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory,
	dbVolumeRepository db.VolumeRepository,
	dbTeamFactory db.TeamFactory,
	workerFactory db.WorkerFactory,
	workerVersion *version.Version,
	baggageclaimResponseHeaderTimeout time.Duration,
) WorkerProvider

type WorkerSpec

type WorkerSpec struct {
	Platform     string
	ResourceType string
	Tags         []string
	TeamID       int
}

func (WorkerSpec) Description

func (spec WorkerSpec) Description() string

Directories

Path Synopsis
imagefakes
Code generated by counterfeiter.
Code generated by counterfeiter.
transportfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL