README ¶
Worker
Worker is the component of Travis CI that will run a CI job on some form of compute instance. It's responsible for getting the bash script from travis-build, spinning up the compute instance (VM, Docker container, or maybe something different), uploading the bash script, running it, and streaming the logs back to travis-logs. It also sends state updates to travis-hub.
Installing
from binary
Find the version you wish to install on the GitHub Releases
page and download either the
darwin-amd64
binary for macOS or the linux-amd64
binary for Linux. No other
operating systems or architectures have pre-built binaries at this time.
from package
Use the ./bin/travis-worker-install
script,
or take a look at the packagecloud
instructions.
from source
- install Go
v1.7+
- clone this down into your
$GOPATH
mkdir -p $GOPATH/src/github.com/travis-ci
git clone https://github.com/travis-ci/worker $GOPATH/src/github.com/travis-ci/worker
cd $GOPATH/src/github.com/travis-ci/worker
- install gvt:
go get -u github.com/FiloSottile/gvt
- install gometalinter:
go get -u github.com/alecthomas/gometalinter
gometalinter --install
- install shellcheck
make
Uninstalling
In the case where Travis Worker has been installed via a deb package, there is a convenience script available which may be executed like so:
curl -sSL https://raw.githubusercontent.com/travis-ci/worker/master/bin/travis-worker-uninstall |
sudo bash -
Configuring Travis Worker
Travis Worker is configured with environment variables or command line flags via the urfave/cli library. A list of the non-dynamic flags and environment variables may be found by invoking the built-in help system:
travis-worker --help
Environment-based image selection configuration
Some backend providers support image selection based on environment variables. The required format uses keys that are prefixed with the provider-specific prefix:
TRAVIS_WORKER_{UPPERCASE_PROVIDER}_IMAGE_{UPPERCASE_NAME}
: contains an image name string to be used by the backend provider
The following example is for use with the Docker backend:
# matches on `dist: trusty`
export TRAVIS_WORKER_DOCKER_IMAGE_DIST_TRUSTY=travisci/ci-connie:packer-1420290255-fafafaf
# matches on `dist: bionic`
export TRAVIS_WORKER_DOCKER_IMAGE_DIST_BIONIC=registry.business.com/fancy/ubuntu:bionic
# resolves for `language: ruby`
export TRAVIS_WORKER_DOCKER_IMAGE_RUBY=registry.business.com/travisci/ci-ruby:whatever
# resolves for `group: edge` + `language: python`
export TRAVIS_WORKER_DOCKER_IMAGE_GROUP_EDGE_PYTHON=travisci/ci-garnet:packer-1530230255-fafafaf
# used when no dist, language, or group matches
export TRAVIS_WORKER_DOCKER_IMAGE_DEFAULT=travisci/ci-garnet:packer-1410230255-fafafaf
Development: Running Travis Worker locally
This section is for anyone wishing to contribute code to Worker. The code itself should have godoc-compatible docs (which can be viewed on godoc.org: https://godoc.org/github.com/travis-ci/worker), this is mainly a higher-level overview of the code.
Environment
Ensure you've defined the necessary environment variables (see .example.env
).
Pull Docker images
docker pull travisci/ci-amethyst:packer-1504724461
docker tag travisci/ci-amethyst:packer-1504724461 travis:default
Configuration
For configuration, there are some things like the job-board (TRAVIS_WORKER_JOB_BOARD_URL
)
and travis-build (TRAVIS_WORKER_BUILD_API_URI
) URLs that need to be set. These
can be set to the staging values.
export TRAVIS_WORKER_JOB_BOARD_URL='https://travis-worker:API_KEY@job-board-staging.travis-ci.com'
export TRAVIS_WORKER_BUILD_API_URI='https://x:API_KEY@build-staging.travis-ci.org/script'
TRAVIS_WORKER_BUILD_API_URI
can be found in the env of the job board app, e.g.:
heroku config:get JOB_BOARD_BUILD_API_ORG_URL -a job-board-staging
.
Images
TODO
Configuring the requested provider/backend
Each provider requires its own configuration, which must be provided via
environment variables namespaced by TRAVIS_WORKER_{PROVIDER}_
.
The backend should be configured to be Docker, e.g.:
export TRAVIS_WORKER_PROVIDER_NAME='docker'
export TRAVIS_WORKER_DOCKER_ENDPOINT=unix:///var/run/docker.sock # or "tcp://localhost:4243"
export TRAVIS_WORKER_DOCKER_PRIVILEGED="false" # optional
export TRAVIS_WORKER_DOCKER_CERT_PATH="/etc/secret-docker-cert-stuff" # optional
Queue configuration
File-based queue
For the queue configuration, there is a file-based queue implementation so you don't have to mess around with RabbitMQ.
You can generate a payload via the generate-job-payload.rb
script on travis-scheduler:
heroku run -a travis-scheduler-staging script/generate-job-payload.rb <job id> > payload.json
Place the file in the $TRAVIS_WORKER_QUEUE_NAME/10-created.d/
directory, where
it will be picked up by the worker.
See example-payload.json
for an example payload.
AMQP-based queue
export TRAVIS_WORKER_QUEUE_TYPE='amqp'
export TRAVIS_WORKER_AMQP_URI='amqp://guest:guest@localhost'
The web interface is accessible at http://localhost:15672/
To verify your messages are being published, try:
rabbitmqadmin get queue=reporting.jobs.builds
Note: You will first need to install rabbitmqadmin
. See http://localhost:15672/cli
See script/publish-example-payload
for a script to enqueue example-payload.json
.
Building and running
Run make build
after making any changes. make
also executes the test suite.
make
${GOPATH%%:*}/bin/travis-worker
or in Docker (FIXME):
docker build -t travis-worker .
# ordocker pull travisci/worker
docker run --env-file ENV_FILE -ti travis-worker
# ortravisci/worker
Testing
Run make test
. To run backend tests matching Docker
, for example, run
go test -v ./backend -test.run Docker
.
Verifying and exporting configuration
To inspect the parsed configuration in a format that can be used as a base
environment variable configuration, use the --echo-config
flag, which will
exit immediately after writing to stdout:
travis-worker --echo-config
Stopping Travis Worker
Travis Worker has two shutdown modes: Graceful and immediate. The graceful shutdown will tell the worker to not start any additional jobs but finish the jobs it is currently running before it shuts down. The immediate shutdown will make the worker stop the jobs it's working on, requeue them, and clean up any open resources (shut down VMs, cleanly close connections, etc.)
To start a graceful shutdown, send an INT signal to the worker (for example
using kill -INT
). To start an immediate shutdown, send a TERM signal to the
worker (for example using kill -TERM
).
Go dependency management
Travis Worker is built via the standard go
commands and dependencies managed
by gvt
.
To work with the dependencies you need to do the following first
- Have this repository checked out
- Install
gvt
withgithub.com/FiloSottile/gvt
Updating existing vendored dependencies
To update an existing vendored dependency, do the following in this directory:
gvt update name/of/dependency
e.g.gvt update github.com/pkg/sftp
Adding a new dependency
To add a new dependency, do the following:
gvt fetch name/of/package
e.g.gvt fetch github.com/pkg/sftp
Release process
Since we want to easily keep track of worker changes, we often associate them with a version number.
To find out the current version, check the changelog or run travis-worker --version
.
We typically use semantic versioning to determine how to increase this number.
Once you've decided what the next version number should be, update the changelog making sure you include all relevant changes that happened since the previous version was tagged. You can see these by running git diff vX.X.X...HEAD
, where v.X.X.X
is the name of the previous version.
Once the changelog has been updated and merged to master
, the merge commit needs to be signed and manually tagged with the version number. To do this, run:
git tag --sign -a vX.X.X -m "Worker version vX.X.X"
git push origin vX.X.X
The Travis build corresponding to this push should build and upload a worker image with the new tag to Dockerhub.
The next step is to create a new Github release tag with the appropriate information from the changelog.
License and Copyright Information
See LICENSE file.
© 2018 Travis CI GmbH
Documentation ¶
Overview ¶
Package worker implements the core of Worker.
The main coordinating component in the core is the Processor, and is a good place to start when exploring the code. It gets the jobs off the job queue and calls the right things in order to run it. The ProcessorPool starts up the required number of Processors to get the concurrency that's wanted for a single Worker instance.
Index ¶
- Constants
- Variables
- type AMQPCanceller
- type AMQPJobQueue
- type AMQPLogWriterFactory
- type BuildPayload
- type BuildScriptGenerator
- type BuildScriptGeneratorError
- type BuildTracePersister
- type CLI
- type CancellationBroadcaster
- type EnvVar
- type FileJobQueue
- type FinishState
- type HTTPJobQueue
- type Job
- type JobJobPayload
- type JobMetaPayload
- type JobPayload
- type JobQueue
- type JobStartedMeta
- type LogWriter
- type LogWriterFactory
- type MultiSourceJobQueue
- type Processor
- type ProcessorConfig
- type ProcessorPool
- func (p *ProcessorPool) Decr()
- func (p *ProcessorPool) Each(f func(int, *Processor))
- func (p *ProcessorPool) ExpectedSize() int
- func (p *ProcessorPool) GracefulShutdown(togglePause bool)
- func (p *ProcessorPool) Incr()
- func (p *ProcessorPool) Ready() bool
- func (p *ProcessorPool) Run(poolSize int, queue JobQueue, logWriterFactory LogWriterFactory) error
- func (p *ProcessorPool) SetSize(newSize int)
- func (p *ProcessorPool) Size() int
- func (p *ProcessorPool) TotalProcessed() int
- type ProcessorPoolConfig
- type RemoteController
- func (api *RemoteController) CheckAuth(next http.Handler) http.Handler
- func (api *RemoteController) DecrementPool(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) GetWorkerInfo(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) HealthCheck(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) IncrementPool(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) ReadyCheck(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) SetContext(next http.Handler) http.Handler
- func (api *RemoteController) Setup()
- func (api *RemoteController) ShutdownWorker(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) UpdateWorkerInfo(w http.ResponseWriter, req *http.Request)
- type RepositoryPayload
- type SentryHook
- type TimeoutsPayload
Constants ¶
const ( VMTypeDefault = "default" VMTypePremium = "premium" )
Variables ¶
var ( // LogWriterTick is how often the buffer should be flushed out and sent to // travis-logs. LogWriterTick = 500 * time.Millisecond // LogChunkSize is a bit of a magic number, calculated like this: The // maximum Pusher payload is 10 kB (or 10 KiB, who knows, but let's go with // 10 kB since that is smaller). Looking at the travis-logs source, the // current message overhead (i.e. the part of the payload that isn't // the content of the log part) is 42 bytes + the length of the JSON- // encoded ID and the length of the JSON-encoded sequence number. A 64- // bit number is up to 20 digits long, so that means (assuming we don't // go over 64-bit numbers) the overhead is up to 82 bytes. That means // we can send up to 9918 bytes of content. However, the JSON-encoded // version of a string can be significantly longer than the raw bytes. // Worst case that I could find is "<", which with the Go JSON encoder // becomes "\u003c" (i.e. six bytes long). So, given a string of just // left angle brackets, the string would become six times as long, // meaning that the longest string we can take is 1653. We could still // get errors if we go over 64-bit numbers, but I find the likeliness // of that happening to both the sequence number, the ID, and us maxing // out the worst-case logs to be quite unlikely, so I'm willing to live // with that. --Sarah LogChunkSize = 1653 )
var ( // VersionString is the git describe version set at build time VersionString = "?" // RevisionString is the git revision set at build time RevisionString = "?" // RevisionURLString is the full URL to the revision set at build time RevisionURLString = "?" // GeneratedString is the build date set at build time GeneratedString = "?" // CopyrightString is the copyright set at build time CopyrightString = "?" )
var JobCancelledError = errors.New("job cancelled")
var LogWriterTimeout = errors.New("log writer timeout")
var MaxLogLengthExceeded = errors.New("maximum log length exceeded")
var VMConfigDefault = backend.VmConfig{GpuCount: 0, GpuType: ""}
Functions ¶
This section is empty.
Types ¶
type AMQPCanceller ¶ added in v1.3.0
type AMQPCanceller struct {
// contains filtered or unexported fields
}
AMQPCanceller is responsible for listening to a command queue on AMQP and dispatching the commands to the right place. Currently the only valid command is the 'cancel job' command.
func NewAMQPCanceller ¶ added in v1.3.0
func NewAMQPCanceller(ctx gocontext.Context, conn *amqp.Connection, cancellationBroadcaster *CancellationBroadcaster) *AMQPCanceller
NewAMQPCanceller creates a new AMQPCanceller. No network traffic occurs until you call Run()
func (*AMQPCanceller) Run ¶ added in v1.3.0
func (d *AMQPCanceller) Run()
Run will make the AMQPCanceller listen to the worker command queue and start dispatching any incoming commands.
type AMQPJobQueue ¶ added in v1.3.0
type AMQPJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
AMQPJobQueue is a JobQueue that uses AMQP
func NewAMQPJobQueue ¶ added in v1.3.0
func NewAMQPJobQueue(conn *amqp.Connection, queue string, stateUpdatePoolSize int, sharded bool) (*AMQPJobQueue, error)
NewAMQPJobQueue creates a AMQPJobQueue backed by the given AMQP connections and connects to the AMQP queue with the given name. The queue will be declared in AMQP when this function is called, so an error could be raised if the queue already exists, but with different attributes than we expect.
func (*AMQPJobQueue) Cleanup ¶ added in v1.3.0
func (q *AMQPJobQueue) Cleanup() error
Cleanup closes the underlying AMQP connection
func (*AMQPJobQueue) Jobs ¶ added in v1.3.0
func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs creates a new consumer on the queue, and returns three channels. The first channel gets sent every BuildJob that we receive from AMQP. The stopChan is a channel that can be closed in order to stop the consumer.
func (*AMQPJobQueue) Name ¶
func (q *AMQPJobQueue) Name() string
Name returns the name of this queue type, wow!
type AMQPLogWriterFactory ¶
type AMQPLogWriterFactory struct {
// contains filtered or unexported fields
}
func NewAMQPLogWriterFactory ¶
func NewAMQPLogWriterFactory(conn *amqp.Connection, sharded bool) (*AMQPLogWriterFactory, error)
func (*AMQPLogWriterFactory) Cleanup ¶
func (l *AMQPLogWriterFactory) Cleanup() error
type BuildPayload ¶
BuildPayload contains information about the build.
type BuildScriptGenerator ¶
A BuildScriptGenerator generates a build script for a given job payload.
func NewBuildScriptGenerator ¶
func NewBuildScriptGenerator(cfg *config.Config) BuildScriptGenerator
NewBuildScriptGenerator creates a generator backed by an HTTP API.
type BuildScriptGeneratorError ¶
type BuildScriptGeneratorError struct { // true when this error can be recovered by retrying later Recover bool // contains filtered or unexported fields }
A BuildScriptGeneratorError is sometimes used by the Generate method on a BuildScriptGenerator to return more metadata about an error.
type BuildTracePersister ¶
A BuildTracePersister persists a build trace. (duh)
func NewBuildTracePersister ¶
func NewBuildTracePersister(cfg *config.Config) BuildTracePersister
NewBuildTracePersister creates a build trace persister backed by S3
type CLI ¶ added in v1.3.0
type CLI struct { Config *config.Config BuildScriptGenerator BuildScriptGenerator BuildTracePersister BuildTracePersister BackendProvider backend.Provider ProcessorPool *ProcessorPool CancellationBroadcaster *CancellationBroadcaster JobQueue JobQueue LogWriterFactory LogWriterFactory // contains filtered or unexported fields }
CLI is the top level of execution for the whole shebang
type CancellationBroadcaster ¶
type CancellationBroadcaster struct {
// contains filtered or unexported fields
}
A CancellationBroadcaster allows you to subscribe to and unsubscribe from cancellation messages for a given job ID.
func NewCancellationBroadcaster ¶
func NewCancellationBroadcaster() *CancellationBroadcaster
NewCancellationBroadcaster sets up a new cancellation broadcaster with an empty registry.
func (*CancellationBroadcaster) Broadcast ¶
func (cb *CancellationBroadcaster) Broadcast(id uint64)
Broadcast broacasts a cancellation message to all currently subscribed cancellers.
func (*CancellationBroadcaster) Subscribe ¶
func (cb *CancellationBroadcaster) Subscribe(id uint64) <-chan struct{}
Subscribe will set up a subscription for cancellation messages for the given job ID. When a cancellation message comes in, the returned channel will be closed.
func (*CancellationBroadcaster) Unsubscribe ¶
func (cb *CancellationBroadcaster) Unsubscribe(id uint64, ch <-chan struct{})
Unsubscribe removes an existing subscription for the channel.
type FileJobQueue ¶ added in v1.3.0
type FileJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
FileJobQueue is a JobQueue that uses directories for input, state, and output
func NewFileJobQueue ¶ added in v1.3.0
func NewFileJobQueue(baseDir, queue string, pollingInterval time.Duration) (*FileJobQueue, error)
NewFileJobQueue creates a *FileJobQueue from a base directory and queue name
func (*FileJobQueue) Cleanup ¶ added in v1.3.0
func (f *FileJobQueue) Cleanup() error
Cleanup is a no-op
func (*FileJobQueue) Jobs ¶ added in v1.3.0
func (f *FileJobQueue) Jobs(ctx gocontext.Context) (<-chan Job, error)
Jobs returns a channel of jobs from the created directory
func (*FileJobQueue) Name ¶
func (q *FileJobQueue) Name() string
Name returns the name of this queue type, wow!
type FinishState ¶
type FinishState string
FinishState is the state that a job finished with (such as pass/fail/etc.). You should not provide a string directly, but use one of the FinishStateX constants defined in this package.
const ( FinishStatePassed FinishState = "passed" FinishStateFailed FinishState = "failed" FinishStateErrored FinishState = "errored" FinishStateCancelled FinishState = "cancelled" )
Valid finish states for the FinishState type
type HTTPJobQueue ¶
type HTTPJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
HTTPJobQueue is a JobQueue that uses http
func NewHTTPJobQueue ¶
func NewHTTPJobQueue(jobBoardURL *url.URL, site, providerName, queue string, cb *CancellationBroadcaster) (*HTTPJobQueue, error)
NewHTTPJobQueue creates a new http job queue
func NewHTTPJobQueueWithIntervals ¶
func NewHTTPJobQueueWithIntervals(jobBoardURL *url.URL, site, providerName, queue string, pollInterval, refreshClaimInterval time.Duration, cb *CancellationBroadcaster) (*HTTPJobQueue, error)
NewHTTPJobQueueWithIntervals creates a new http job queue with the specified poll and refresh claim intervals
func (*HTTPJobQueue) Jobs ¶
func (q *HTTPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs consumes new jobs from job-board
func (*HTTPJobQueue) Name ¶
func (q *HTTPJobQueue) Name() string
Name returns the name of this queue type, wow!
type Job ¶
type Job interface { Payload() *JobPayload RawPayload() *simplejson.Json StartAttributes() *backend.StartAttributes FinishState() FinishState Requeued() bool Received(gocontext.Context) error Started(gocontext.Context) error Error(gocontext.Context, string) error Requeue(gocontext.Context) error Finish(gocontext.Context, FinishState) error LogWriter(gocontext.Context, time.Duration) (LogWriter, error) Name() string SetupContext(gocontext.Context) gocontext.Context }
A Job ties togeher all the elements required for a build job
type JobJobPayload ¶
type JobJobPayload struct { ID uint64 `json:"id"` Number string `json:"number"` QueuedAt *time.Time `json:"queued_at"` }
JobJobPayload contains information about the job.
type JobMetaPayload ¶
type JobMetaPayload struct {
StateUpdateCount uint `json:"state_update_count"`
}
JobMetaPayload contains meta information about the job.
type JobPayload ¶
type JobPayload struct { Type string `json:"type"` Job JobJobPayload `json:"job"` Build BuildPayload `json:"source"` Repository RepositoryPayload `json:"repository"` UUID string `json:"uuid"` Config map[string]interface{} `json:"config"` Timeouts TimeoutsPayload `json:"timeouts,omitempty"` VMType string `json:"vm_type"` VMConfig backend.VmConfig `json:"vm_config"` Meta JobMetaPayload `json:"meta"` Queue string `json:"queue"` Trace bool `json:"trace"` Warmer bool `json:"warmer"` }
JobPayload is the payload we receive over RabbitMQ.
type JobQueue ¶
type JobQueue interface { Jobs(gocontext.Context) (<-chan Job, error) Name() string Cleanup() error }
JobQueue is the minimal interface needed by a ProcessorPool
type JobStartedMeta ¶
type JobStartedMeta struct { QueuedAt *time.Time `json:"queued_at"` Repo string `json:"repo"` Queue string `json:"queue"` Infra string `json:"infra"` }
JobStartedMeta is metadata that is useful for computing time to first log line downstream, and breaking it down into further dimensions.
type LogWriter ¶
type LogWriter interface { io.WriteCloser WriteAndClose([]byte) (int, error) Timeout() <-chan time.Time SetMaxLogLength(int) SetJobStarted(meta *JobStartedMeta) SetCancelFunc(gocontext.CancelFunc) MaxLengthReached() bool }
LogWriter is primarily an io.Writer that will send all bytes to travis-logs for processing, and also has some utility methods for timeouts and log length limiting. Each LogWriter is tied to a given job, and can be gotten by calling the LogWriter() method on a Job.
type LogWriterFactory ¶
type MultiSourceJobQueue ¶
type MultiSourceJobQueue struct {
// contains filtered or unexported fields
}
func NewMultiSourceJobQueue ¶
func NewMultiSourceJobQueue(queues ...JobQueue) *MultiSourceJobQueue
func (*MultiSourceJobQueue) Cleanup ¶
func (msjq *MultiSourceJobQueue) Cleanup() error
Cleanup runs cleanup for each source queue
func (*MultiSourceJobQueue) Jobs ¶
func (msjq *MultiSourceJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs returns a Job channel that selects over each source queue Job channel
func (*MultiSourceJobQueue) Name ¶
func (msjq *MultiSourceJobQueue) Name() string
Name builds a name from each source queue name
type Processor ¶
type Processor struct { ID string // ProcessedCount contains the number of jobs that has been processed // by this Processor. This value should not be modified outside of the // Processor. ProcessedCount int // CurrentStatus contains the current status of the processor, and can // be one of "new", "waiting", "processing" or "done". CurrentStatus string // LastJobID contains the ID of the last job the processor processed. LastJobID uint64 // contains filtered or unexported fields }
A Processor gets jobs off the job queue and coordinates running it with other components.
func NewProcessor ¶
func NewProcessor(ctx gocontext.Context, hostname string, queue JobQueue, logWriterFactory LogWriterFactory, provider backend.Provider, generator BuildScriptGenerator, persister BuildTracePersister, cancellationBroadcaster *CancellationBroadcaster, config ProcessorConfig) (*Processor, error)
NewProcessor creates a new processor that will run the build jobs on the given channel using the given provider and getting build scripts from the generator.
func (*Processor) GracefulShutdown ¶
func (p *Processor) GracefulShutdown()
GracefulShutdown tells the processor to finish the job it is currently processing, but not pick up any new jobs. This method will return immediately, the processor is done when Run() returns.
type ProcessorConfig ¶
type ProcessorPool ¶
type ProcessorPool struct { Context gocontext.Context Provider backend.Provider Generator BuildScriptGenerator Persister BuildTracePersister CancellationBroadcaster *CancellationBroadcaster Hostname string Config *config.Config // contains filtered or unexported fields }
A ProcessorPool spins up multiple Processors handling build jobs from the same queue.
func NewProcessorPool ¶
func NewProcessorPool(ppc *ProcessorPoolConfig, provider backend.Provider, generator BuildScriptGenerator, persister BuildTracePersister, cancellationBroadcaster *CancellationBroadcaster) *ProcessorPool
NewProcessorPool creates a new processor pool using the given arguments.
func (*ProcessorPool) Decr ¶ added in v1.3.0
func (p *ProcessorPool) Decr()
Decr pops a processor out of the pool and issues a graceful shutdown
func (*ProcessorPool) Each ¶ added in v1.3.0
func (p *ProcessorPool) Each(f func(int, *Processor))
Each loops through all the processors in the pool and calls the given function for each of them, passing in the index and the processor. The order of the processors is the same for the same set of processors.
func (*ProcessorPool) ExpectedSize ¶
func (p *ProcessorPool) ExpectedSize() int
ExpectedSize returns the size of the pool once gracefully shutdown processors complete.
After calling SetSize, ExpectedSize will soon reflect the requested new size, while Size will include processors that are still processing their last job before shutting down.
func (*ProcessorPool) GracefulShutdown ¶
func (p *ProcessorPool) GracefulShutdown(togglePause bool)
GracefulShutdown causes each processor in the pool to start its graceful shutdown.
func (*ProcessorPool) Incr ¶ added in v1.3.0
func (p *ProcessorPool) Incr()
Incr adds a single running processor to the pool
func (*ProcessorPool) Ready ¶
func (p *ProcessorPool) Ready() bool
Ready returns true if the processor pool is running as expected. Returns false if the processor pool has not been started yet.
func (*ProcessorPool) Run ¶
func (p *ProcessorPool) Run(poolSize int, queue JobQueue, logWriterFactory LogWriterFactory) error
Run starts up a number of processors and connects them to the given queue. This method stalls until all processors have finished.
func (*ProcessorPool) SetSize ¶
func (p *ProcessorPool) SetSize(newSize int)
SetSize adjust the pool to run the given number of processors.
This operates in an eventually consistent manner. Because some workers may be running jobs, we may not be able to immediately adjust the pool size. Once jobs finish, the pool size should rest at the given value.
func (*ProcessorPool) Size ¶ added in v1.3.0
func (p *ProcessorPool) Size() int
Size returns the number of processors that are currently running.
This includes processors that are in the process of gracefully shutting down. It's important to track these because they are still running jobs and thus still using resources that need to be managed and tracked.
func (*ProcessorPool) TotalProcessed ¶
func (p *ProcessorPool) TotalProcessed() int
TotalProcessed returns the sum of all processor ProcessedCount values.
type ProcessorPoolConfig ¶
type RemoteController ¶
type RemoteController struct {
// contains filtered or unexported fields
}
RemoteController provides an HTTP API for controlling worker.
func (*RemoteController) CheckAuth ¶
func (api *RemoteController) CheckAuth(next http.Handler) http.Handler
CheckAuth is a middleware for all HTTP API methods that ensures that the configured basic auth credentials were passed in the request.
func (*RemoteController) DecrementPool ¶
func (api *RemoteController) DecrementPool(w http.ResponseWriter, req *http.Request)
DecrementPool tells the worker to gracefully shutdown a processor.
func (*RemoteController) GetWorkerInfo ¶
func (api *RemoteController) GetWorkerInfo(w http.ResponseWriter, req *http.Request)
GetWorkerInfo writes a JSON payload with useful information about the current state of worker as a whole.
func (*RemoteController) HealthCheck ¶
func (api *RemoteController) HealthCheck(w http.ResponseWriter, req *http.Request)
HealthCheck indicates whether worker is currently functioning in a healthy way. This can be used by a system like Kubernetes to determine whether to replace an instance of worker with a new one.
func (*RemoteController) IncrementPool ¶
func (api *RemoteController) IncrementPool(w http.ResponseWriter, req *http.Request)
IncrementPool tells the worker to spin up another processor.
func (*RemoteController) ReadyCheck ¶
func (api *RemoteController) ReadyCheck(w http.ResponseWriter, req *http.Request)
ReadyCheck indicates whether the worker is ready to receive requests. This is intended to be used as a readiness check in a system like Kubernetes. We should not attempt to interact with the remote controller until this returns a successful status.
func (*RemoteController) SetContext ¶
func (api *RemoteController) SetContext(next http.Handler) http.Handler
SetContext is a middleware function that loads some values into the request context. This allows these values to be shown in logging.
func (*RemoteController) Setup ¶
func (api *RemoteController) Setup()
Setup installs the HTTP routes that will handle requests to the HTTP API.
func (*RemoteController) ShutdownWorker ¶
func (api *RemoteController) ShutdownWorker(w http.ResponseWriter, req *http.Request)
ShutdownWorker tells the worker to shutdown.
Options can be passed in the body that determine whether the shutdown is done gracefully or not.
func (*RemoteController) UpdateWorkerInfo ¶
func (api *RemoteController) UpdateWorkerInfo(w http.ResponseWriter, req *http.Request)
UpdateWorkerInfo allows reconfiguring some parts of worker on the fly.
The main use of this is adjusting the size of the processor pool without interrupting existing running jobs.
type RepositoryPayload ¶
RepositoryPayload contains information about the repository.
type SentryHook ¶ added in v1.3.0
SentryHook delivers logs to a sentry server
func NewSentryHook ¶ added in v1.3.0
func NewSentryHook(DSN string, levels []logrus.Level) (*SentryHook, error)
NewSentryHook creates a hook to be added to an instance of logger and initializes the raven client. This method sets the timeout to 100 milliseconds.
func (*SentryHook) Fire ¶ added in v1.3.0
func (hook *SentryHook) Fire(entry *logrus.Entry) error
Fire is called when an event should be sent to sentry
func (*SentryHook) Levels ¶ added in v1.3.0
func (hook *SentryHook) Levels() []logrus.Level
Levels returns the available logging levels.
type TimeoutsPayload ¶ added in v1.3.0
type TimeoutsPayload struct { HardLimit uint64 `json:"hard_limit"` LogSilence uint64 `json:"log_silence"` }
TimeoutsPayload contains information about any custom timeouts. The timeouts are given in seconds, and a value of 0 means no custom timeout is set.
Source Files ¶
- amqp_canceller.go
- amqp_job.go
- amqp_job_queue.go
- amqp_log_writer.go
- amqp_log_writer_factory.go
- build_script_generator.go
- build_trace_persister.go
- canceller.go
- cli.go
- doc.go
- file_job.go
- file_job_queue.go
- file_log_writer.go
- help.go
- http_job.go
- http_job_queue.go
- http_log_part_sink.go
- http_log_writer.go
- job.go
- job_queue.go
- log_writer.go
- log_writer_factory.go
- multi_source_job_queue.go
- package.go
- processor.go
- processor_pool.go
- remote_controller.go
- sentry_logrus_hook.go
- step_check_cancellation.go
- step_download_trace.go
- step_generate_script.go
- step_open_log_writer.go
- step_run_script.go
- step_send_received.go
- step_sleep.go
- step_start_instance.go
- step_subscribe_cancellation.go
- step_transform_build_json.go
- step_update_state.go
- step_upload_script.go
- step_write_worker_info.go
- version.go
Directories ¶
Path | Synopsis |
---|---|
Package backend provides the compute instance backends supported by Worker.
|
Package backend provides the compute instance backends supported by Worker. |
cmd
|
|
travis-worker
Package main implements the CLI for the travis-worker binary
|
Package main implements the CLI for the travis-worker binary |
Package config contains the flags and defaults for Worker configuration.
|
Package config contains the flags and defaults for Worker configuration. |
Package context contains functions to embed data in a context.
|
Package context contains functions to embed data in a context. |
Package image contains logic for image selection logic.
|
Package image contains logic for image selection logic. |
Package metrics provides easy methods to send metrics
|
Package metrics provides easy methods to send metrics |
Package ratelimit implements a rate limiter to avoid calling APIs too often.
|
Package ratelimit implements a rate limiter to avoid calling APIs too often. |