README ¶
Worker
Worker is the component of Travis CI that will run a CI job on some form of compute instance. It's responsible for getting the bash script from travis-build, spin up the compute instance (VM, Docker container, or maybe something different), upload the bash script, run it and stream the logs back to travis-logs. It also sends state updates to travis-hub.
Installing
from binary
Find the version you wish to install on the GitHub Releases
page and download either the
darwin-amd64
binary for OS X or the linux-amd64
binary for Linux. No other
operating systems or architectures have pre-built binaries at this time.
from package
Use the ./bin/travis-worker-install
script,
or take a look at the packagecloud
instructions.
from source
- install Go
v1.7+
- clone this down into your
$GOPATH
mkdir -p $GOPATH/src/github.com/travis-ci
git clone https://github.com/travis-ci/worker $GOPATH/src/github.com/travis-ci/worker
cd $GOPATH/src/github.com/travis-ci/worker
- install gvt:
go get -u github.com/FiloSottile/gvt
- install gometalinter:
go get -u github.com/alecthomas/gometalinter
gometalinter --install
make
Configuring Travis Worker
Travis Worker is configured with environment variables or command line flags via the urfave/cli library. A list of the non-dynamic flags and environment variables may be found by invoking the built-in help system:
travis-worker --help
Development: Running Travis Worker locally
This section is for anyone wishing to contribute code to Worker. The code itself should have godoc-compatible docs (which can be viewed on godoc.org: https://godoc.org/github.com/travis-ci/worker), this is mainly a higher-level overview of the code.
Environment
Ensure you've defined the necessary environment variables (see .example.env
).
Pull Docker images
$ docker pull travisci/ci-amethyst:packer-1504724461
$ docker tag travisci/ci-amethyst:packer-1504724461 travis:default
Configuration
For configuration, there are some things like the job-board (TRAVIS_WORKER_JOB_BOARD_URL
)
and travis-build (TRAVIS_WORKER_BUILD_API_URI
) URLs that need to be set. These
can be set to the staging values.
export TRAVIS_WORKER_JOB_BOARD_URL='https://travis-worker:API_KEY@job-board-staging.travis-ci.com'
export TRAVIS_WORKER_BUILD_API_URI='https://x:API_KEY@build-staging.travis-ci.org/script'
TRAVIS_WORKER_BUILD_API_URI
can be found in the env of the job board app, e.g.:
heroku config:get JOB_BOARD_BUILD_API_ORG_URL -a job-board-staging
.
Configuring the requested provider/backend
Each provider requires its own configuration, which must be provided via
environment variables namespaced by TRAVIS_WORKER_{PROVIDER}_
.
The backend should be configured to be Docker, e.g.:
export TRAVIS_WORKER_PROVIDER_NAME='docker'
export TRAVIS_WORKER_DOCKER_ENDPOINT=unix:///var/run/docker.sock # or "tcp://localhost:4243"
export TRAVIS_WORKER_DOCKER_PRIVILEGED="false" # optional
export TRAVIS_WORKER_DOCKER_CERT_PATH="/etc/secret-docker-cert-stuff" # optional
Queue configuration
For the queue configuration, there is a file-based queue implementation so you don't have to mess around with RabbitMQ.
You can generate a payload via the generate-job-payload.rb
script on travis-scheduler:
$ heroku run -a travis-scheduler-staging script/generate-job-payload.rb <job id> > payload.json
Place the file in the $TRAVIS_WORKER_QUEUE_NAME/10-created.d/
directory, where
it will be picked up by the worker.
See example-payload.json
for an example payload.
Building and running
Run make build
after making any changes. make
also executes the test suite.
make
${GOPATH%%:*}/bin/travis-worker
or in Docker (FIXME):
docker build -t travis-worker .
# ordocker pull travisci/worker
docker run --env-file ENV_FILE -ti travis-worker
# ortravisci/worker
Testing
Run make test
. To run backend tests matching Docker
, for example, run
go test -v ./backend -test.run Docker
.
Verifying and exporting configuration
To inspect the parsed configuration in a format that can be used as a base
environment variable configuration, use the --echo-config
flag, which will
exit immediately after writing to stdout:
travis-worker --echo-config
Stopping Travis Worker
Travis Worker has two shutdown modes: Graceful and immediate. The graceful shutdown will tell the worker to not start any additional jobs, but finish the jobs it is currently running before it shuts down. The immediate shutdown will make the worker stop the jobs it's working on and requeue them, and clean up any open resources (shut down VMs, cleanly close connections, etc.)
To start a graceful shutdown, send an INT signal to the worker (for example
using kill -INT
). To start an immediate shutdown, send a TERM signal to the
worker (for example using kill -TERM
).
Go dependency management
Travis Worker is built via the standard go
commands, and dependencies managed
by gvt
.
To work with the dependencies you need to do the following first
- Have this repository checked out
- Install
gvt
withgithub.com/FiloSottile/gvt
Updating existing vendored dependencies
To update and existing vendored dependency, do the following in this directory:
gvt update name/of/dependency
e.g.gvt update github.com/pkg/sftp
Adding a new dependency
To add a new dependency, do the following:
gvt fetch name/of/package
e.g.gvt fetch github.com/pkg/sftp
Release process
The parts of the release process that haven't yet been automated look like this:
- review the diff since last release for silliness
- decide what the version bump should be
- update
./CHANGELOG.md
(in a release prep branch) - tag accordingly after merge
- update github release tag with relevant section from
./CHANGELOG.md
- attach binaries to github release tag
License and Copyright Information
See LICENSE file.
© 2017 Travis CI GmbH
Documentation ¶
Overview ¶
Package worker implements the core of Worker.
The main coordinating component in the core is the Processor, and is a good place to start when exploring the code. It gets the jobs off the job queue and calls the right things in order to run it. The ProcessorPool starts up the required number of Processors to get the concurrency that's wanted for a single Worker instance.
Index ¶
- Constants
- Variables
- type AMQPCanceller
- type AMQPJobQueue
- type BuildPayload
- type BuildScriptGenerator
- type BuildScriptGeneratorError
- type CLI
- type CancellationBroadcaster
- type EnvVar
- type FileJobQueue
- type FinishState
- type HTTPJobQueue
- type Job
- type JobJobPayload
- type JobMetaPayload
- type JobPayload
- type JobQueue
- type LogWriter
- type MultiSourceJobQueue
- type Processor
- type ProcessorConfig
- type ProcessorPool
- func (p *ProcessorPool) Decr()
- func (p *ProcessorPool) Each(f func(int, *Processor))
- func (p *ProcessorPool) GracefulShutdown(togglePause bool)
- func (p *ProcessorPool) Incr()
- func (p *ProcessorPool) Run(poolSize int, queue JobQueue) error
- func (p *ProcessorPool) Size() int
- func (p *ProcessorPool) TotalProcessed() int
- type ProcessorPoolConfig
- type RepositoryPayload
- type SentryHook
- type TimeoutsPayload
Constants ¶
const ( VMTypeDefault = "default" VMTypePremium = "premium" )
Variables ¶
var ( // LogWriterTick is how often the buffer should be flushed out and sent to // travis-logs. LogWriterTick = 500 * time.Millisecond // LogChunkSize is a bit of a magic number, calculated like this: The // maximum Pusher payload is 10 kB (or 10 KiB, who knows, but let's go with // 10 kB since that is smaller). Looking at the travis-logs source, the // current message overhead (i.e. the part of the payload that isn't // the content of the log part) is 42 bytes + the length of the JSON- // encoded ID and the length of the JSON-encoded sequence number. A 64- // bit number is up to 20 digits long, so that means (assuming we don't // go over 64-bit numbers) the overhead is up to 82 bytes. That means // we can send up to 9918 bytes of content. However, the JSON-encoded // version of a string can be significantly longer than the raw bytes. // Worst case that I could find is "<", which with the Go JSON encoder // becomes "\u003c" (i.e. six bytes long). So, given a string of just // left angle brackets, the string would become six times as long, // meaning that the longest string we can take is 1653. We could still // get errors if we go over 64-bit numbers, but I find the likeliness // of that happening to both the sequence number, the ID, and us maxing // out the worst-case logs to be quite unlikely, so I'm willing to live // with that. --Henrik LogChunkSize = 1653 // ErrWrotePastMaxLogLength is returned by LogWriter.Write if the write // caused the number of written bytes to go over the maximum log length. ErrWrotePastMaxLogLength = errors.New("wrote past max length") )
var ( // VersionString is the git describe version set at build time VersionString = "?" // RevisionString is the git revision set at build time RevisionString = "?" // RevisionURLString is the full URL to the revision set at build time RevisionURLString = "?" // GeneratedString is the build date set at build time GeneratedString = "?" // CopyrightString is the copyright set at build time CopyrightString = "?" )
Functions ¶
This section is empty.
Types ¶
type AMQPCanceller ¶ added in v1.3.0
type AMQPCanceller struct {
// contains filtered or unexported fields
}
AMQPCanceller is responsible for listening to a command queue on AMQP and dispatching the commands to the right place. Currently the only valid command is the 'cancel job' command.
func NewAMQPCanceller ¶ added in v1.3.0
func NewAMQPCanceller(ctx gocontext.Context, conn *amqp.Connection, cancellationBroadcaster *CancellationBroadcaster) *AMQPCanceller
NewAMQPCanceller creates a new AMQPCanceller. No network traffic occurs until you call Run()
func (*AMQPCanceller) Run ¶ added in v1.3.0
func (d *AMQPCanceller) Run()
Run will make the AMQPCanceller listen to the worker command queue and start dispatching any incoming commands.
type AMQPJobQueue ¶ added in v1.3.0
type AMQPJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
AMQPJobQueue is a JobQueue that uses AMQP
func NewAMQPJobQueue ¶ added in v1.3.0
func NewAMQPJobQueue(conn *amqp.Connection, queue string) (*AMQPJobQueue, error)
NewAMQPJobQueue creates a AMQPJobQueue backed by the given AMQP connections and connects to the AMQP queue with the given name. The queue will be declared in AMQP when this function is called, so an error could be raised if the queue already exists, but with different attributes than we expect.
func (*AMQPJobQueue) Cleanup ¶ added in v1.3.0
func (q *AMQPJobQueue) Cleanup() error
Cleanup closes the underlying AMQP connection
func (*AMQPJobQueue) Jobs ¶ added in v1.3.0
func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs creates a new consumer on the queue, and returns three channels. The first channel gets sent every BuildJob that we receive from AMQP. The stopChan is a channel that can be closed in order to stop the consumer.
func (*AMQPJobQueue) Name ¶
func (q *AMQPJobQueue) Name() string
Name returns the name of this queue type, wow!
type BuildPayload ¶
BuildPayload contains information about the build.
type BuildScriptGenerator ¶
A BuildScriptGenerator generates a build script for a given job payload.
func NewBuildScriptGenerator ¶
func NewBuildScriptGenerator(cfg *config.Config) BuildScriptGenerator
NewBuildScriptGenerator creates a generator backed by an HTTP API.
type BuildScriptGeneratorError ¶
type BuildScriptGeneratorError struct { // true when this error can be recovered by retrying later Recover bool // contains filtered or unexported fields }
A BuildScriptGeneratorError is sometimes used by the Generate method on a BuildScriptGenerator to return more metadata about an error.
type CLI ¶ added in v1.3.0
type CLI struct { Config *config.Config BuildScriptGenerator BuildScriptGenerator BackendProvider backend.Provider ProcessorPool *ProcessorPool CancellationBroadcaster *CancellationBroadcaster JobQueue JobQueue // contains filtered or unexported fields }
CLI is the top level of execution for the whole shebang
type CancellationBroadcaster ¶
type CancellationBroadcaster struct {
// contains filtered or unexported fields
}
A CancellationBroadcaster allows you to subscribe to and unsubscribe from cancellation messages for a given job ID.
func NewCancellationBroadcaster ¶
func NewCancellationBroadcaster() *CancellationBroadcaster
NewCancellationBroadcaster sets up a new cancellation broadcaster with an empty registry.
func (*CancellationBroadcaster) Broadcast ¶
func (cb *CancellationBroadcaster) Broadcast(id uint64)
Broadcast broacasts a cancellation message to all currently subscribed cancellers.
func (*CancellationBroadcaster) Subscribe ¶
func (cb *CancellationBroadcaster) Subscribe(id uint64) <-chan struct{}
Subscribe will set up a subscription for cancellation messages for the given job ID. When a cancellation message comes in, the returned channel will be closed.
func (*CancellationBroadcaster) Unsubscribe ¶
func (cb *CancellationBroadcaster) Unsubscribe(id uint64, ch <-chan struct{})
Unsubscribe removes an existing subscription for the channel.
type FileJobQueue ¶ added in v1.3.0
type FileJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
FileJobQueue is a JobQueue that uses directories for input, state, and output
func NewFileJobQueue ¶ added in v1.3.0
func NewFileJobQueue(baseDir, queue string, pollingInterval time.Duration) (*FileJobQueue, error)
NewFileJobQueue creates a *FileJobQueue from a base directory and queue name
func (*FileJobQueue) Cleanup ¶ added in v1.3.0
func (f *FileJobQueue) Cleanup() error
Cleanup is a no-op
func (*FileJobQueue) Jobs ¶ added in v1.3.0
func (f *FileJobQueue) Jobs(ctx gocontext.Context) (<-chan Job, error)
Jobs returns a channel of jobs from the created directory
func (*FileJobQueue) Name ¶
func (q *FileJobQueue) Name() string
Name returns the name of this queue type, wow!
type FinishState ¶
type FinishState string
FinishState is the state that a job finished with (such as pass/fail/etc.). You should not provide a string directly, but use one of the FinishStateX constants defined in this package.
const ( FinishStatePassed FinishState = "passed" FinishStateFailed FinishState = "failed" FinishStateErrored FinishState = "errored" FinishStateCancelled FinishState = "cancelled" )
Valid finish states for the FinishState type
type HTTPJobQueue ¶
type HTTPJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
HTTPJobQueue is a JobQueue that uses http
func NewHTTPJobQueue ¶
func NewHTTPJobQueue(jobBoardURL *url.URL, site, providerName, queue string, cb *CancellationBroadcaster) (*HTTPJobQueue, error)
NewHTTPJobQueue creates a new http job queue
func NewHTTPJobQueueWithIntervals ¶
func NewHTTPJobQueueWithIntervals(jobBoardURL *url.URL, site, providerName, queue string, pollInterval, refreshClaimInterval time.Duration, cb *CancellationBroadcaster) (*HTTPJobQueue, error)
NewHTTPJobQueueWithIntervals creates a new http job queue with the specified poll and refresh claim intervals
func (*HTTPJobQueue) Jobs ¶
func (q *HTTPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs consumes new jobs from job-board
func (*HTTPJobQueue) Name ¶
func (q *HTTPJobQueue) Name() string
Name returns the name of this queue type, wow!
type Job ¶
type Job interface { Payload() *JobPayload RawPayload() *simplejson.Json StartAttributes() *backend.StartAttributes Received(gocontext.Context) error Started(gocontext.Context) error Error(gocontext.Context, string) error Requeue(gocontext.Context) error Finish(gocontext.Context, FinishState) error LogWriter(gocontext.Context, time.Duration) (LogWriter, error) Name() string SetupContext(gocontext.Context) gocontext.Context }
A Job ties togeher all the elements required for a build job
type JobJobPayload ¶
type JobJobPayload struct { ID uint64 `json:"id"` Number string `json:"number"` QueuedAt *time.Time `json:"queued_at"` }
JobJobPayload contains information about the job.
type JobMetaPayload ¶
type JobMetaPayload struct {
StateUpdateCount uint `json:"state_update_count"`
}
JobMetaPayload contains meta information about the job.
type JobPayload ¶
type JobPayload struct { Type string `json:"type"` Job JobJobPayload `json:"job"` Build BuildPayload `json:"source"` Repository RepositoryPayload `json:"repository"` UUID string `json:"uuid"` Config map[string]interface{} `json:"config"` Timeouts TimeoutsPayload `json:"timeouts,omitempty"` VMType string `json:"vm_type"` Meta JobMetaPayload `json:"meta"` }
JobPayload is the payload we receive over RabbitMQ.
type JobQueue ¶
type JobQueue interface { Jobs(gocontext.Context) (<-chan Job, error) Name() string Cleanup() error }
JobQueue is the minimal interface needed by a ProcessorPool
type LogWriter ¶
type LogWriter interface { io.WriteCloser WriteAndClose([]byte) (int, error) Timeout() <-chan time.Time SetMaxLogLength(int) }
LogWriter is primarily an io.Writer that will send all bytes to travis-logs for processing, and also has some utility methods for timeouts and log length limiting. Each LogWriter is tied to a given job, and can be gotten by calling the LogWriter() method on a Job.
type MultiSourceJobQueue ¶
type MultiSourceJobQueue struct {
// contains filtered or unexported fields
}
func NewMultiSourceJobQueue ¶
func NewMultiSourceJobQueue(queues ...JobQueue) *MultiSourceJobQueue
func (*MultiSourceJobQueue) Cleanup ¶
func (msjq *MultiSourceJobQueue) Cleanup() error
Cleanup runs cleanup for each source queue
func (*MultiSourceJobQueue) Jobs ¶
func (msjq *MultiSourceJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs returns a Job channel that selects over each source queue Job channel
func (*MultiSourceJobQueue) Name ¶
func (msjq *MultiSourceJobQueue) Name() string
Name builds a name from each source queue name
type Processor ¶
type Processor struct { ID string // ProcessedCount contains the number of jobs that has been processed // by this Processor. This value should not be modified outside of the // Processor. ProcessedCount int // CurrentStatus contains the current status of the processor, and can // be one of "new", "waiting", "processing" or "done". CurrentStatus string // LastJobID contains the ID of the last job the processor processed. LastJobID uint64 SkipShutdownOnLogTimeout bool // contains filtered or unexported fields }
A Processor gets jobs off the job queue and coordinates running it with other components.
func NewProcessor ¶
func NewProcessor(ctx gocontext.Context, hostname string, queue JobQueue, provider backend.Provider, generator BuildScriptGenerator, cancellationBroadcaster *CancellationBroadcaster, config ProcessorConfig) (*Processor, error)
NewProcessor creates a new processor that will run the build jobs on the given channel using the given provider and getting build scripts from the generator.
func (*Processor) GracefulShutdown ¶
func (p *Processor) GracefulShutdown()
GracefulShutdown tells the processor to finish the job it is currently processing, but not pick up any new jobs. This method will return immediately, the processor is done when Run() returns.
type ProcessorConfig ¶
type ProcessorPool ¶
type ProcessorPool struct { Context gocontext.Context Provider backend.Provider Generator BuildScriptGenerator CancellationBroadcaster *CancellationBroadcaster Hostname string HardTimeout, InitialSleep, LogTimeout, ScriptUploadTimeout, StartupTimeout time.Duration MaxLogLength int PayloadFilterExecutable string SkipShutdownOnLogTimeout bool // contains filtered or unexported fields }
A ProcessorPool spins up multiple Processors handling build jobs from the same queue.
func NewProcessorPool ¶
func NewProcessorPool(ppc *ProcessorPoolConfig, provider backend.Provider, generator BuildScriptGenerator, cancellationBroadcaster *CancellationBroadcaster) *ProcessorPool
NewProcessorPool creates a new processor pool using the given arguments.
func (*ProcessorPool) Decr ¶ added in v1.3.0
func (p *ProcessorPool) Decr()
Decr pops a processor out of the pool and issues a graceful shutdown
func (*ProcessorPool) Each ¶ added in v1.3.0
func (p *ProcessorPool) Each(f func(int, *Processor))
Each loops through all the processors in the pool and calls the given function for each of them, passing in the index and the processor. The order of the processors is the same for the same set of processors.
func (*ProcessorPool) GracefulShutdown ¶
func (p *ProcessorPool) GracefulShutdown(togglePause bool)
GracefulShutdown causes each processor in the pool to start its graceful shutdown.
func (*ProcessorPool) Incr ¶ added in v1.3.0
func (p *ProcessorPool) Incr()
Incr adds a single running processor to the pool
func (*ProcessorPool) Run ¶
func (p *ProcessorPool) Run(poolSize int, queue JobQueue) error
Run starts up a number of processors and connects them to the given queue. This method stalls until all processors have finished.
func (*ProcessorPool) Size ¶ added in v1.3.0
func (p *ProcessorPool) Size() int
Size returns the number of processors in the pool
func (*ProcessorPool) TotalProcessed ¶
func (p *ProcessorPool) TotalProcessed() int
TotalProcessed returns the sum of all processor ProcessedCount values.
type ProcessorPoolConfig ¶
type RepositoryPayload ¶
RepositoryPayload contains information about the repository.
type SentryHook ¶ added in v1.3.0
SentryHook delivers logs to a sentry server
func NewSentryHook ¶ added in v1.3.0
func NewSentryHook(DSN string, levels []logrus.Level) (*SentryHook, error)
NewSentryHook creates a hook to be added to an instance of logger and initializes the raven client. This method sets the timeout to 100 milliseconds.
func (*SentryHook) Fire ¶ added in v1.3.0
func (hook *SentryHook) Fire(entry *logrus.Entry) error
Fire is called when an event should be sent to sentry
func (*SentryHook) Levels ¶ added in v1.3.0
func (hook *SentryHook) Levels() []logrus.Level
Levels returns the available logging levels.
type TimeoutsPayload ¶ added in v1.3.0
type TimeoutsPayload struct { HardLimit uint64 `json:"hard_limit"` LogSilence uint64 `json:"log_silence"` }
TimeoutsPayload contains information about any custom timeouts. The timeouts are given in seconds, and a value of 0 means no custom timeout is set.
Source Files ¶
- amqp_canceller.go
- amqp_job.go
- amqp_job_queue.go
- amqp_log_writer.go
- build_script_generator.go
- canceller.go
- cli.go
- doc.go
- file_job.go
- file_job_queue.go
- file_log_writer.go
- help.go
- http_job.go
- http_job_queue.go
- http_log_part_sink.go
- http_log_writer.go
- job.go
- job_queue.go
- log_writer.go
- multi_source_job_queue.go
- package.go
- processor.go
- processor_pool.go
- sentry_logrus_hook.go
- step_check_cancellation.go
- step_generate_script.go
- step_open_log_writer.go
- step_run_script.go
- step_send_received.go
- step_sleep.go
- step_start_instance.go
- step_subscribe_cancellation.go
- step_transform_build_json.go
- step_update_state.go
- step_upload_script.go
- step_write_worker_info.go
- version.go
Directories ¶
Path | Synopsis |
---|---|
Package backend provides the compute instance backends supported by Worker.
|
Package backend provides the compute instance backends supported by Worker. |
cmd
|
|
travis-worker
Package main implements the CLI for the travis-worker binary
|
Package main implements the CLI for the travis-worker binary |
Package config contains the flags and defaults for Worker configuration.
|
Package config contains the flags and defaults for Worker configuration. |
Package context contains functions to embed data in a context.
|
Package context contains functions to embed data in a context. |
Package image contains logic for image selection logic.
|
Package image contains logic for image selection logic. |
Package metrics provides easy methods to send metrics
|
Package metrics provides easy methods to send metrics |
Package ratelimit implements a rate limiter to avoid calling APIs too often.
|
Package ratelimit implements a rate limiter to avoid calling APIs too often. |