worker

package module
v6.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2018 License: MIT Imports: 56 Imported by: 3

README

Worker Build Status

Worker is the component of Travis CI that will run a CI job on some form of compute instance. It's responsible for getting the bash script from travis-build, spinning up the compute instance (VM, Docker container, or maybe something different), uploading the bash script, running it, and streaming the logs back to travis-logs. It also sends state updates to travis-hub.

Installing

from binary

Find the version you wish to install on the GitHub Releases page and download either the darwin-amd64 binary for macOS or the linux-amd64 binary for Linux. No other operating systems or architectures have pre-built binaries at this time.

from package

Use the ./bin/travis-worker-install script, or take a look at the packagecloud instructions.

from source
  1. install Go v1.7+
  2. clone this down into your $GOPATH
  • mkdir -p $GOPATH/src/github.com/travis-ci
  • git clone https://github.com/travis-ci/worker $GOPATH/src/github.com/travis-ci/worker
  • cd $GOPATH/src/github.com/travis-ci/worker
  1. install gvt:
  • go get -u github.com/FiloSottile/gvt
  1. install gometalinter:
  • go get -u github.com/alecthomas/gometalinter
  • gometalinter --install
  1. install shellcheck
  2. make

Uninstalling

In the case where Travis Worker has been installed via a deb package, there is a convenience script available which may be executed like so:

curl -sSL https://raw.githubusercontent.com/travis-ci/worker/master/bin/travis-worker-uninstall |
  sudo bash -

Configuring Travis Worker

Travis Worker is configured with environment variables or command line flags via the urfave/cli library. A list of the non-dynamic flags and environment variables may be found by invoking the built-in help system:

travis-worker --help
Environment-based image selection configuration

Some backend providers support image selection based on environment variables. The required format uses keys that are prefixed with the provider-specific prefix:

  • TRAVIS_WORKER_{UPPERCASE_PROVIDER}_IMAGE_{UPPERCASE_NAME}: contains an image name string to be used by the backend provider

The following example is for use with the Docker backend:

# matches on `dist: trusty`
export TRAVIS_WORKER_DOCKER_IMAGE_DIST_TRUSTY=travisci/ci-connie:packer-1420290255-fafafaf

# matches on `dist: bionic`
export TRAVIS_WORKER_DOCKER_IMAGE_DIST_BIONIC=registry.business.com/fancy/ubuntu:bionic

# resolves for `language: ruby`
export TRAVIS_WORKER_DOCKER_IMAGE_RUBY=registry.business.com/travisci/ci-ruby:whatever

# resolves for `group: edge` + `language: python`
export TRAVIS_WORKER_DOCKER_IMAGE_GROUP_EDGE_PYTHON=travisci/ci-garnet:packer-1530230255-fafafaf

# used when no dist, language, or group matches
export TRAVIS_WORKER_DOCKER_IMAGE_DEFAULT=travisci/ci-garnet:packer-1410230255-fafafaf

Development: Running Travis Worker locally

This section is for anyone wishing to contribute code to Worker. The code itself should have godoc-compatible docs (which can be viewed on godoc.org: https://godoc.org/github.com/travis-ci/worker), this is mainly a higher-level overview of the code.

Environment

Ensure you've defined the necessary environment variables (see .example.env).

Pull Docker images
docker pull travisci/ci-amethyst:packer-1504724461
docker tag travisci/ci-amethyst:packer-1504724461 travis:default
Configuration

For configuration, there are some things like the job-board (TRAVIS_WORKER_JOB_BOARD_URL) and travis-build (TRAVIS_WORKER_BUILD_API_URI) URLs that need to be set. These can be set to the staging values.

export TRAVIS_WORKER_JOB_BOARD_URL='https://travis-worker:API_KEY@job-board-staging.travis-ci.com'
export TRAVIS_WORKER_BUILD_API_URI='https://x:API_KEY@build-staging.travis-ci.org/script'

TRAVIS_WORKER_BUILD_API_URI can be found in the env of the job board app, e.g.: heroku config:get JOB_BOARD_BUILD_API_ORG_URL -a job-board-staging.

Images

TODO

Configuring the requested provider/backend

Each provider requires its own configuration, which must be provided via environment variables namespaced by TRAVIS_WORKER_{PROVIDER}_.

Docker

The backend should be configured to be Docker, e.g.:

export TRAVIS_WORKER_PROVIDER_NAME='docker'
export TRAVIS_WORKER_DOCKER_ENDPOINT=unix:///var/run/docker.sock        # or "tcp://localhost:4243"
export TRAVIS_WORKER_DOCKER_PRIVILEGED="false"                          # optional
export TRAVIS_WORKER_DOCKER_CERT_PATH="/etc/secret-docker-cert-stuff"   # optional
Queue configuration
File-based queue

For the queue configuration, there is a file-based queue implementation so you don't have to mess around with RabbitMQ.

You can generate a payload via the generate-job-payload.rb script on travis-scheduler:

heroku run -a travis-scheduler-staging script/generate-job-payload.rb <job id> > payload.json

Place the file in the $TRAVIS_WORKER_QUEUE_NAME/10-created.d/ directory, where it will be picked up by the worker.

See example-payload.json for an example payload.

AMQP-based queue
export TRAVIS_WORKER_QUEUE_TYPE='amqp'
export TRAVIS_WORKER_AMQP_URI='amqp://guest:guest@localhost'

The web interface is accessible at http://localhost:15672/

To verify your messages are being published, try:

rabbitmqadmin get queue=reporting.jobs.builds

Note: You will first need to install rabbitmqadmin. See http://localhost:15672/cli

See script/publish-example-payload for a script to enqueue example-payload.json.

Building and running

Run make build after making any changes. make also executes the test suite.

  1. make
  2. ${GOPATH%%:*}/bin/travis-worker

or in Docker (FIXME):

  1. docker build -t travis-worker . # or docker pull travisci/worker
  2. docker run --env-file ENV_FILE -ti travis-worker # or travisci/worker
Testing

Run make test. To run backend tests matching Docker, for example, run go test -v ./backend -test.run Docker.

Verifying and exporting configuration

To inspect the parsed configuration in a format that can be used as a base environment variable configuration, use the --echo-config flag, which will exit immediately after writing to stdout:

travis-worker --echo-config

Stopping Travis Worker

Travis Worker has two shutdown modes: Graceful and immediate. The graceful shutdown will tell the worker to not start any additional jobs but finish the jobs it is currently running before it shuts down. The immediate shutdown will make the worker stop the jobs it's working on, requeue them, and clean up any open resources (shut down VMs, cleanly close connections, etc.)

To start a graceful shutdown, send an INT signal to the worker (for example using kill -INT). To start an immediate shutdown, send a TERM signal to the worker (for example using kill -TERM).

Go dependency management

Travis Worker is built via the standard go commands and dependencies managed by gvt.

To work with the dependencies you need to do the following first

  • Have this repository checked out
  • Install gvt with github.com/FiloSottile/gvt
Updating existing vendored dependencies

To update an existing vendored dependency, do the following in this directory:

  • gvt update name/of/dependency e.g. gvt update github.com/pkg/sftp
Adding a new dependency

To add a new dependency, do the following:

  • gvt fetch name/of/package e.g. gvt fetch github.com/pkg/sftp

Release process

Since we want to easily keep track of worker changes, we often associate them with a version number. To find out the current version, check the changelog or run travis-worker --version. We typically use semantic versioning to determine how to increase this number.

Once you've decided what the next version number should be, update the changelog making sure you include all relevant changes that happened since the previous version was tagged. You can see these by running git diff vX.X.X...HEAD, where v.X.X.X is the name of the previous version.

Once the changelog has been updated and merged to master, the merge commit needs to be signed and manually tagged with the version number. To do this, run:

git tag --sign -a vX.X.X -m "Worker version vX.X.X"
git push origin vX.X.X

The Travis build corresponding to this push should build and upload a worker image with the new tag to Dockerhub.

The next step is to create a new Github release tag with the appropriate information from the changelog.

See LICENSE file.

© 2018 Travis CI GmbH

Documentation

Overview

Package worker implements the core of Worker.

The main coordinating component in the core is the Processor, and is a good place to start when exploring the code. It gets the jobs off the job queue and calls the right things in order to run it. The ProcessorPool starts up the required number of Processors to get the concurrency that's wanted for a single Worker instance.

Index

Constants

View Source
const (
	VMTypeDefault = "default"
	VMTypePremium = "premium"
)

Variables

View Source
var (
	// LogWriterTick is how often the buffer should be flushed out and sent to
	// travis-logs.
	LogWriterTick = 500 * time.Millisecond

	// LogChunkSize is a bit of a magic number, calculated like this: The
	// maximum Pusher payload is 10 kB (or 10 KiB, who knows, but let's go with
	// 10 kB since that is smaller). Looking at the travis-logs source, the
	// current message overhead (i.e. the part of the payload that isn't
	// the content of the log part) is 42 bytes + the length of the JSON-
	// encoded ID and the length of the JSON-encoded sequence number. A 64-
	// bit number is up to 20 digits long, so that means (assuming we don't
	// go over 64-bit numbers) the overhead is up to 82 bytes. That means
	// we can send up to 9918 bytes of content. However, the JSON-encoded
	// version of a string can be significantly longer than the raw bytes.
	// Worst case that I could find is "<", which with the Go JSON encoder
	// becomes "\u003c" (i.e. six bytes long). So, given a string of just
	// left angle brackets, the string would become six times as long,
	// meaning that the longest string we can take is 1653. We could still
	// get errors if we go over 64-bit numbers, but I find the likeliness
	// of that happening to both the sequence number, the ID, and us maxing
	// out the worst-case logs to be quite unlikely, so I'm willing to live
	// with that. --Sarah
	LogChunkSize = 1653
)
View Source
var (
	// VersionString is the git describe version set at build time
	VersionString = "?"
	// RevisionString is the git revision set at build time
	RevisionString = "?"
	// RevisionURLString is the full URL to the revision set at build time
	RevisionURLString = "?"
	// GeneratedString is the build date set at build time
	GeneratedString = "?"
	// CopyrightString is the copyright set at build time
	CopyrightString = "?"
)
View Source
var JobCancelledError = errors.New("job cancelled")
View Source
var LogWriterTimeout = errors.New("log writer timeout")
View Source
var MaxLogLengthExceeded = errors.New("maximum log length exceeded")
View Source
var VMConfigDefault = backend.VmConfig{GpuCount: 0, GpuType: ""}

Functions

This section is empty.

Types

type AMQPCanceller added in v1.3.0

type AMQPCanceller struct {
	// contains filtered or unexported fields
}

AMQPCanceller is responsible for listening to a command queue on AMQP and dispatching the commands to the right place. Currently the only valid command is the 'cancel job' command.

func NewAMQPCanceller added in v1.3.0

func NewAMQPCanceller(ctx gocontext.Context, conn *amqp.Connection, cancellationBroadcaster *CancellationBroadcaster) *AMQPCanceller

NewAMQPCanceller creates a new AMQPCanceller. No network traffic occurs until you call Run()

func (*AMQPCanceller) Run added in v1.3.0

func (d *AMQPCanceller) Run()

Run will make the AMQPCanceller listen to the worker command queue and start dispatching any incoming commands.

type AMQPJobQueue added in v1.3.0

type AMQPJobQueue struct {
	DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
	// contains filtered or unexported fields
}

AMQPJobQueue is a JobQueue that uses AMQP

func NewAMQPJobQueue added in v1.3.0

func NewAMQPJobQueue(conn *amqp.Connection, queue string, stateUpdatePoolSize int, sharded bool) (*AMQPJobQueue, error)

NewAMQPJobQueue creates a AMQPJobQueue backed by the given AMQP connections and connects to the AMQP queue with the given name. The queue will be declared in AMQP when this function is called, so an error could be raised if the queue already exists, but with different attributes than we expect.

func (*AMQPJobQueue) Cleanup added in v1.3.0

func (q *AMQPJobQueue) Cleanup() error

Cleanup closes the underlying AMQP connection

func (*AMQPJobQueue) Jobs added in v1.3.0

func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)

Jobs creates a new consumer on the queue, and returns three channels. The first channel gets sent every BuildJob that we receive from AMQP. The stopChan is a channel that can be closed in order to stop the consumer.

func (*AMQPJobQueue) Name

func (q *AMQPJobQueue) Name() string

Name returns the name of this queue type, wow!

type AMQPLogWriterFactory

type AMQPLogWriterFactory struct {
	// contains filtered or unexported fields
}

func NewAMQPLogWriterFactory

func NewAMQPLogWriterFactory(conn *amqp.Connection, sharded bool) (*AMQPLogWriterFactory, error)

func (*AMQPLogWriterFactory) Cleanup

func (l *AMQPLogWriterFactory) Cleanup() error

func (*AMQPLogWriterFactory) LogWriter

func (l *AMQPLogWriterFactory) LogWriter(ctx gocontext.Context, defaultLogTimeout time.Duration, job Job) (LogWriter, error)

type BuildPayload

type BuildPayload struct {
	ID     uint64 `json:"id"`
	Number string `json:"number"`
}

BuildPayload contains information about the build.

type BuildScriptGenerator

type BuildScriptGenerator interface {
	Generate(gocontext.Context, Job) ([]byte, error)
}

A BuildScriptGenerator generates a build script for a given job payload.

func NewBuildScriptGenerator

func NewBuildScriptGenerator(cfg *config.Config) BuildScriptGenerator

NewBuildScriptGenerator creates a generator backed by an HTTP API.

type BuildScriptGeneratorError

type BuildScriptGeneratorError struct {

	// true when this error can be recovered by retrying later
	Recover bool
	// contains filtered or unexported fields
}

A BuildScriptGeneratorError is sometimes used by the Generate method on a BuildScriptGenerator to return more metadata about an error.

type BuildTracePersister

type BuildTracePersister interface {
	Persist(gocontext.Context, Job, []byte) error
}

A BuildTracePersister persists a build trace. (duh)

func NewBuildTracePersister

func NewBuildTracePersister(cfg *config.Config) BuildTracePersister

NewBuildTracePersister creates a build trace persister backed by S3

type CLI added in v1.3.0

type CLI struct {
	Config                  *config.Config
	BuildScriptGenerator    BuildScriptGenerator
	BuildTracePersister     BuildTracePersister
	BackendProvider         backend.Provider
	ProcessorPool           *ProcessorPool
	CancellationBroadcaster *CancellationBroadcaster
	JobQueue                JobQueue
	LogWriterFactory        LogWriterFactory
	// contains filtered or unexported fields
}

CLI is the top level of execution for the whole shebang

func NewCLI added in v1.3.0

func NewCLI(c *cli.Context) *CLI

NewCLI creates a new *CLI from a *cli.Context

func (*CLI) Run added in v1.3.0

func (i *CLI) Run()

Run starts all long-running processes and blocks until the processor pool returns from its Run func

func (*CLI) Setup added in v1.3.0

func (i *CLI) Setup() (bool, error)

Setup runs one-time preparatory actions and returns a boolean success value that is used to determine if it is safe to invoke the Run func

type CancellationBroadcaster

type CancellationBroadcaster struct {
	// contains filtered or unexported fields
}

A CancellationBroadcaster allows you to subscribe to and unsubscribe from cancellation messages for a given job ID.

func NewCancellationBroadcaster

func NewCancellationBroadcaster() *CancellationBroadcaster

NewCancellationBroadcaster sets up a new cancellation broadcaster with an empty registry.

func (*CancellationBroadcaster) Broadcast

func (cb *CancellationBroadcaster) Broadcast(id uint64)

Broadcast broacasts a cancellation message to all currently subscribed cancellers.

func (*CancellationBroadcaster) Subscribe

func (cb *CancellationBroadcaster) Subscribe(id uint64) <-chan struct{}

Subscribe will set up a subscription for cancellation messages for the given job ID. When a cancellation message comes in, the returned channel will be closed.

func (*CancellationBroadcaster) Unsubscribe

func (cb *CancellationBroadcaster) Unsubscribe(id uint64, ch <-chan struct{})

Unsubscribe removes an existing subscription for the channel.

type EnvVar

type EnvVar struct {
	Name   string
	Public bool
	Value  string
}

type FileJobQueue added in v1.3.0

type FileJobQueue struct {
	DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
	// contains filtered or unexported fields
}

FileJobQueue is a JobQueue that uses directories for input, state, and output

func NewFileJobQueue added in v1.3.0

func NewFileJobQueue(baseDir, queue string, pollingInterval time.Duration) (*FileJobQueue, error)

NewFileJobQueue creates a *FileJobQueue from a base directory and queue name

func (*FileJobQueue) Cleanup added in v1.3.0

func (f *FileJobQueue) Cleanup() error

Cleanup is a no-op

func (*FileJobQueue) Jobs added in v1.3.0

func (f *FileJobQueue) Jobs(ctx gocontext.Context) (<-chan Job, error)

Jobs returns a channel of jobs from the created directory

func (*FileJobQueue) Name

func (q *FileJobQueue) Name() string

Name returns the name of this queue type, wow!

type FinishState

type FinishState string

FinishState is the state that a job finished with (such as pass/fail/etc.). You should not provide a string directly, but use one of the FinishStateX constants defined in this package.

const (
	FinishStatePassed    FinishState = "passed"
	FinishStateFailed    FinishState = "failed"
	FinishStateErrored   FinishState = "errored"
	FinishStateCancelled FinishState = "cancelled"
)

Valid finish states for the FinishState type

type HTTPJobQueue

type HTTPJobQueue struct {
	DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
	// contains filtered or unexported fields
}

HTTPJobQueue is a JobQueue that uses http

func NewHTTPJobQueue

func NewHTTPJobQueue(jobBoardURL *url.URL, site, providerName, queue string,
	cb *CancellationBroadcaster) (*HTTPJobQueue, error)

NewHTTPJobQueue creates a new http job queue

func NewHTTPJobQueueWithIntervals

func NewHTTPJobQueueWithIntervals(jobBoardURL *url.URL, site, providerName, queue string,
	pollInterval, refreshClaimInterval time.Duration,
	cb *CancellationBroadcaster) (*HTTPJobQueue, error)

NewHTTPJobQueueWithIntervals creates a new http job queue with the specified poll and refresh claim intervals

func (*HTTPJobQueue) Cleanup

func (q *HTTPJobQueue) Cleanup() error

Cleanup does not do anything!

func (*HTTPJobQueue) Jobs

func (q *HTTPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)

Jobs consumes new jobs from job-board

func (*HTTPJobQueue) Name

func (q *HTTPJobQueue) Name() string

Name returns the name of this queue type, wow!

type Job

type Job interface {
	Payload() *JobPayload
	RawPayload() *simplejson.Json
	StartAttributes() *backend.StartAttributes
	FinishState() FinishState
	Requeued() bool

	Received(gocontext.Context) error
	Started(gocontext.Context) error
	Error(gocontext.Context, string) error
	Requeue(gocontext.Context) error
	Finish(gocontext.Context, FinishState) error

	LogWriter(gocontext.Context, time.Duration) (LogWriter, error)
	Name() string
	SetupContext(gocontext.Context) gocontext.Context
}

A Job ties togeher all the elements required for a build job

type JobJobPayload

type JobJobPayload struct {
	ID       uint64     `json:"id"`
	Number   string     `json:"number"`
	QueuedAt *time.Time `json:"queued_at"`
}

JobJobPayload contains information about the job.

type JobMetaPayload

type JobMetaPayload struct {
	StateUpdateCount uint `json:"state_update_count"`
}

JobMetaPayload contains meta information about the job.

type JobPayload

type JobPayload struct {
	Type       string                 `json:"type"`
	Job        JobJobPayload          `json:"job"`
	Build      BuildPayload           `json:"source"`
	Repository RepositoryPayload      `json:"repository"`
	UUID       string                 `json:"uuid"`
	Config     map[string]interface{} `json:"config"`
	Timeouts   TimeoutsPayload        `json:"timeouts,omitempty"`
	VMType     string                 `json:"vm_type"`
	VMConfig   backend.VmConfig       `json:"vm_config"`
	Meta       JobMetaPayload         `json:"meta"`
	Queue      string                 `json:"queue"`
	Trace      bool                   `json:"trace"`
	Warmer     bool                   `json:"warmer"`
}

JobPayload is the payload we receive over RabbitMQ.

type JobQueue

type JobQueue interface {
	Jobs(gocontext.Context) (<-chan Job, error)
	Name() string
	Cleanup() error
}

JobQueue is the minimal interface needed by a ProcessorPool

type JobStartedMeta

type JobStartedMeta struct {
	QueuedAt *time.Time `json:"queued_at"`
	Repo     string     `json:"repo"`
	Queue    string     `json:"queue"`
	Infra    string     `json:"infra"`
}

JobStartedMeta is metadata that is useful for computing time to first log line downstream, and breaking it down into further dimensions.

type LogWriter

type LogWriter interface {
	io.WriteCloser
	WriteAndClose([]byte) (int, error)
	Timeout() <-chan time.Time
	SetMaxLogLength(int)
	SetJobStarted(meta *JobStartedMeta)
	SetCancelFunc(gocontext.CancelFunc)
	MaxLengthReached() bool
}

LogWriter is primarily an io.Writer that will send all bytes to travis-logs for processing, and also has some utility methods for timeouts and log length limiting. Each LogWriter is tied to a given job, and can be gotten by calling the LogWriter() method on a Job.

type LogWriterFactory

type LogWriterFactory interface {
	LogWriter(gocontext.Context, time.Duration, Job) (LogWriter, error)
	Cleanup() error
}

type MultiSourceJobQueue

type MultiSourceJobQueue struct {
	// contains filtered or unexported fields
}

func NewMultiSourceJobQueue

func NewMultiSourceJobQueue(queues ...JobQueue) *MultiSourceJobQueue

func (*MultiSourceJobQueue) Cleanup

func (msjq *MultiSourceJobQueue) Cleanup() error

Cleanup runs cleanup for each source queue

func (*MultiSourceJobQueue) Jobs

func (msjq *MultiSourceJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)

Jobs returns a Job channel that selects over each source queue Job channel

func (*MultiSourceJobQueue) Name

func (msjq *MultiSourceJobQueue) Name() string

Name builds a name from each source queue name

type Processor

type Processor struct {
	ID string

	// ProcessedCount contains the number of jobs that has been processed
	// by this Processor. This value should not be modified outside of the
	// Processor.
	ProcessedCount int

	// CurrentStatus contains the current status of the processor, and can
	// be one of "new", "waiting", "processing" or "done".
	CurrentStatus string

	// LastJobID contains the ID of the last job the processor processed.
	LastJobID uint64
	// contains filtered or unexported fields
}

A Processor gets jobs off the job queue and coordinates running it with other components.

func NewProcessor

func NewProcessor(ctx gocontext.Context, hostname string, queue JobQueue,
	logWriterFactory LogWriterFactory, provider backend.Provider, generator BuildScriptGenerator, persister BuildTracePersister, cancellationBroadcaster *CancellationBroadcaster,
	config ProcessorConfig) (*Processor, error)

NewProcessor creates a new processor that will run the build jobs on the given channel using the given provider and getting build scripts from the generator.

func (*Processor) GracefulShutdown

func (p *Processor) GracefulShutdown()

GracefulShutdown tells the processor to finish the job it is currently processing, but not pick up any new jobs. This method will return immediately, the processor is done when Run() returns.

func (*Processor) Run

func (p *Processor) Run()

Run starts the processor. This method will not return until the processor is terminated, either by calling the GracefulShutdown or Terminate methods, or if the build jobs channel is closed.

func (*Processor) Terminate

func (p *Processor) Terminate()

Terminate tells the processor to stop working on the current job as soon as possible.

type ProcessorConfig

type ProcessorConfig struct {
	Config *config.Config
}

type ProcessorPool

type ProcessorPool struct {
	Context                 gocontext.Context
	Provider                backend.Provider
	Generator               BuildScriptGenerator
	Persister               BuildTracePersister
	CancellationBroadcaster *CancellationBroadcaster
	Hostname                string
	Config                  *config.Config
	// contains filtered or unexported fields
}

A ProcessorPool spins up multiple Processors handling build jobs from the same queue.

func NewProcessorPool

func NewProcessorPool(ppc *ProcessorPoolConfig,
	provider backend.Provider, generator BuildScriptGenerator, persister BuildTracePersister,
	cancellationBroadcaster *CancellationBroadcaster) *ProcessorPool

NewProcessorPool creates a new processor pool using the given arguments.

func (*ProcessorPool) Decr added in v1.3.0

func (p *ProcessorPool) Decr()

Decr pops a processor out of the pool and issues a graceful shutdown

func (*ProcessorPool) Each added in v1.3.0

func (p *ProcessorPool) Each(f func(int, *Processor))

Each loops through all the processors in the pool and calls the given function for each of them, passing in the index and the processor. The order of the processors is the same for the same set of processors.

func (*ProcessorPool) ExpectedSize

func (p *ProcessorPool) ExpectedSize() int

ExpectedSize returns the size of the pool once gracefully shutdown processors complete.

After calling SetSize, ExpectedSize will soon reflect the requested new size, while Size will include processors that are still processing their last job before shutting down.

func (*ProcessorPool) GracefulShutdown

func (p *ProcessorPool) GracefulShutdown(togglePause bool)

GracefulShutdown causes each processor in the pool to start its graceful shutdown.

func (*ProcessorPool) Incr added in v1.3.0

func (p *ProcessorPool) Incr()

Incr adds a single running processor to the pool

func (*ProcessorPool) Ready

func (p *ProcessorPool) Ready() bool

Ready returns true if the processor pool is running as expected. Returns false if the processor pool has not been started yet.

func (*ProcessorPool) Run

func (p *ProcessorPool) Run(poolSize int, queue JobQueue, logWriterFactory LogWriterFactory) error

Run starts up a number of processors and connects them to the given queue. This method stalls until all processors have finished.

func (*ProcessorPool) SetSize

func (p *ProcessorPool) SetSize(newSize int)

SetSize adjust the pool to run the given number of processors.

This operates in an eventually consistent manner. Because some workers may be running jobs, we may not be able to immediately adjust the pool size. Once jobs finish, the pool size should rest at the given value.

func (*ProcessorPool) Size added in v1.3.0

func (p *ProcessorPool) Size() int

Size returns the number of processors that are currently running.

This includes processors that are in the process of gracefully shutting down. It's important to track these because they are still running jobs and thus still using resources that need to be managed and tracked.

func (*ProcessorPool) TotalProcessed

func (p *ProcessorPool) TotalProcessed() int

TotalProcessed returns the sum of all processor ProcessedCount values.

type ProcessorPoolConfig

type ProcessorPoolConfig struct {
	Hostname string
	Context  gocontext.Context
	Config   *config.Config
}

type RemoteController

type RemoteController struct {
	// contains filtered or unexported fields
}

RemoteController provides an HTTP API for controlling worker.

func (*RemoteController) CheckAuth

func (api *RemoteController) CheckAuth(next http.Handler) http.Handler

CheckAuth is a middleware for all HTTP API methods that ensures that the configured basic auth credentials were passed in the request.

func (*RemoteController) DecrementPool

func (api *RemoteController) DecrementPool(w http.ResponseWriter, req *http.Request)

DecrementPool tells the worker to gracefully shutdown a processor.

func (*RemoteController) GetWorkerInfo

func (api *RemoteController) GetWorkerInfo(w http.ResponseWriter, req *http.Request)

GetWorkerInfo writes a JSON payload with useful information about the current state of worker as a whole.

func (*RemoteController) HealthCheck

func (api *RemoteController) HealthCheck(w http.ResponseWriter, req *http.Request)

HealthCheck indicates whether worker is currently functioning in a healthy way. This can be used by a system like Kubernetes to determine whether to replace an instance of worker with a new one.

func (*RemoteController) IncrementPool

func (api *RemoteController) IncrementPool(w http.ResponseWriter, req *http.Request)

IncrementPool tells the worker to spin up another processor.

func (*RemoteController) ReadyCheck

func (api *RemoteController) ReadyCheck(w http.ResponseWriter, req *http.Request)

ReadyCheck indicates whether the worker is ready to receive requests. This is intended to be used as a readiness check in a system like Kubernetes. We should not attempt to interact with the remote controller until this returns a successful status.

func (*RemoteController) SetContext

func (api *RemoteController) SetContext(next http.Handler) http.Handler

SetContext is a middleware function that loads some values into the request context. This allows these values to be shown in logging.

func (*RemoteController) Setup

func (api *RemoteController) Setup()

Setup installs the HTTP routes that will handle requests to the HTTP API.

func (*RemoteController) ShutdownWorker

func (api *RemoteController) ShutdownWorker(w http.ResponseWriter, req *http.Request)

ShutdownWorker tells the worker to shutdown.

Options can be passed in the body that determine whether the shutdown is done gracefully or not.

func (*RemoteController) UpdateWorkerInfo

func (api *RemoteController) UpdateWorkerInfo(w http.ResponseWriter, req *http.Request)

UpdateWorkerInfo allows reconfiguring some parts of worker on the fly.

The main use of this is adjusting the size of the processor pool without interrupting existing running jobs.

type RepositoryPayload

type RepositoryPayload struct {
	ID   uint64 `json:"id"`
	Slug string `json:"slug"`
}

RepositoryPayload contains information about the repository.

type SentryHook added in v1.3.0

type SentryHook struct {
	Timeout time.Duration
	// contains filtered or unexported fields
}

SentryHook delivers logs to a sentry server

func NewSentryHook added in v1.3.0

func NewSentryHook(DSN string, levels []logrus.Level) (*SentryHook, error)

NewSentryHook creates a hook to be added to an instance of logger and initializes the raven client. This method sets the timeout to 100 milliseconds.

func (*SentryHook) Fire added in v1.3.0

func (hook *SentryHook) Fire(entry *logrus.Entry) error

Fire is called when an event should be sent to sentry

func (*SentryHook) Levels added in v1.3.0

func (hook *SentryHook) Levels() []logrus.Level

Levels returns the available logging levels.

type TimeoutsPayload added in v1.3.0

type TimeoutsPayload struct {
	HardLimit  uint64 `json:"hard_limit"`
	LogSilence uint64 `json:"log_silence"`
}

TimeoutsPayload contains information about any custom timeouts. The timeouts are given in seconds, and a value of 0 means no custom timeout is set.

Directories

Path Synopsis
Package backend provides the compute instance backends supported by Worker.
Package backend provides the compute instance backends supported by Worker.
cmd
Package config contains the flags and defaults for Worker configuration.
Package config contains the flags and defaults for Worker configuration.
Package context contains functions to embed data in a context.
Package context contains functions to embed data in a context.
Package image contains logic for image selection logic.
Package image contains logic for image selection logic.
Package metrics provides easy methods to send metrics
Package metrics provides easy methods to send metrics
Package ratelimit implements a rate limiter to avoid calling APIs too often.
Package ratelimit implements a rate limiter to avoid calling APIs too often.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL