Documentation ¶
Overview ¶
Package worker implements the core of Worker.
The main coordinating component in the core is the Processor, and is a good place to start when exploring the code. It gets the jobs off the job queue and calls the right things in order to run it. The ProcessorPool starts up the required number of Processors to get the concurrency that's wanted for a single Worker instance.
Index ¶
- Constants
- Variables
- type AMQPCanceller
- type AMQPJobQueue
- type BuildPayload
- type BuildScriptGenerator
- type BuildScriptGeneratorError
- type CLI
- type CancellationBroadcaster
- type EnvVar
- type FileJobQueue
- type FinishState
- type HTTPJobQueue
- type Job
- type JobJobPayload
- type JobMetaPayload
- type JobPayload
- type JobQueue
- type LogWriter
- type LogsQueue
- type MultiSourceJobQueue
- type Processor
- type ProcessorConfig
- type ProcessorPool
- func (p *ProcessorPool) Decr()
- func (p *ProcessorPool) Each(f func(int, *Processor))
- func (p *ProcessorPool) GracefulShutdown(togglePause bool)
- func (p *ProcessorPool) Incr()
- func (p *ProcessorPool) Run(poolSize int, queue JobQueue) error
- func (p *ProcessorPool) Size() int
- func (p *ProcessorPool) TotalProcessed() int
- type ProcessorPoolConfig
- type RepositoryPayload
- type SentryHook
- type TimeoutsPayload
Constants ¶
const ( VMTypeDefault = "default" VMTypePremium = "premium" )
Variables ¶
var ( // LogWriterTick is how often the buffer should be flushed out and sent to // travis-logs. LogWriterTick = 500 * time.Millisecond // LogChunkSize is a bit of a magic number, calculated like this: The // maximum Pusher payload is 10 kB (or 10 KiB, who knows, but let's go with // 10 kB since that is smaller). Looking at the travis-logs source, the // current message overhead (i.e. the part of the payload that isn't // the content of the log part) is 42 bytes + the length of the JSON- // encoded ID and the length of the JSON-encoded sequence number. A 64- // bit number is up to 20 digits long, so that means (assuming we don't // go over 64-bit numbers) the overhead is up to 82 bytes. That means // we can send up to 9918 bytes of content. However, the JSON-encoded // version of a string can be significantly longer than the raw bytes. // Worst case that I could find is "<", which with the Go JSON encoder // becomes "\u003c" (i.e. six bytes long). So, given a string of just // left angle brackets, the string would become six times as long, // meaning that the longest string we can take is 1653. We could still // get errors if we go over 64-bit numbers, but I find the likeliness // of that happening to both the sequence number, the ID, and us maxing // out the worst-case logs to be quite unlikely, so I'm willing to live // with that. --Henrik LogChunkSize = 1653 // ErrWrotePastMaxLogLength is returned by LogWriter.Write if the write // caused the number of written bytes to go over the maximum log length. ErrWrotePastMaxLogLength = errors.New("wrote past max length") )
var ( // VersionString is the git describe version set at build time VersionString = "?" // RevisionString is the git revision set at build time RevisionString = "?" // RevisionURLString is the full URL to the revision set at build time RevisionURLString = "?" // GeneratedString is the build date set at build time GeneratedString = "?" // CopyrightString is the copyright set at build time CopyrightString = "?" )
Functions ¶
This section is empty.
Types ¶
type AMQPCanceller ¶ added in v1.3.0
type AMQPCanceller struct {
// contains filtered or unexported fields
}
AMQPCanceller is responsible for listening to a command queue on AMQP and dispatching the commands to the right place. Currently the only valid command is the 'cancel job' command.
func NewAMQPCanceller ¶ added in v1.3.0
func NewAMQPCanceller(ctx gocontext.Context, conn *amqp.Connection, cancellationBroadcaster *CancellationBroadcaster) *AMQPCanceller
NewAMQPCanceller creates a new AMQPCanceller. No network traffic occurs until you call Run()
func (*AMQPCanceller) Run ¶ added in v1.3.0
func (d *AMQPCanceller) Run()
Run will make the AMQPCanceller listen to the worker command queue and start dispatching any incoming commands.
type AMQPJobQueue ¶ added in v1.3.0
type AMQPJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
AMQPJobQueue is a JobQueue that uses AMQP
func NewAMQPJobQueue ¶ added in v1.3.0
func NewAMQPJobQueue(conn *amqp.Connection, queue string, stateUpdatePoolSize int) (*AMQPJobQueue, error)
NewAMQPJobQueue creates a AMQPJobQueue backed by the given AMQP connections and connects to the AMQP queue with the given name. The queue will be declared in AMQP when this function is called, so an error could be raised if the queue already exists, but with different attributes than we expect.
func (*AMQPJobQueue) Cleanup ¶ added in v1.3.0
func (q *AMQPJobQueue) Cleanup() error
Cleanup closes the underlying AMQP connection
func (*AMQPJobQueue) Jobs ¶ added in v1.3.0
func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs creates a new consumer on the queue, and returns three channels. The first channel gets sent every BuildJob that we receive from AMQP. The stopChan is a channel that can be closed in order to stop the consumer.
func (*AMQPJobQueue) Name ¶
func (q *AMQPJobQueue) Name() string
Name returns the name of this queue type, wow!
type BuildPayload ¶
BuildPayload contains information about the build.
type BuildScriptGenerator ¶
A BuildScriptGenerator generates a build script for a given job payload.
func NewBuildScriptGenerator ¶
func NewBuildScriptGenerator(cfg *config.Config) BuildScriptGenerator
NewBuildScriptGenerator creates a generator backed by an HTTP API.
type BuildScriptGeneratorError ¶
type BuildScriptGeneratorError struct { // true when this error can be recovered by retrying later Recover bool // contains filtered or unexported fields }
A BuildScriptGeneratorError is sometimes used by the Generate method on a BuildScriptGenerator to return more metadata about an error.
type CLI ¶ added in v1.3.0
type CLI struct { Config *config.Config BuildScriptGenerator BuildScriptGenerator BackendProvider backend.Provider ProcessorPool *ProcessorPool CancellationBroadcaster *CancellationBroadcaster JobQueue JobQueue LogsQueue LogsQueue // contains filtered or unexported fields }
CLI is the top level of execution for the whole shebang
type CancellationBroadcaster ¶
type CancellationBroadcaster struct {
// contains filtered or unexported fields
}
A CancellationBroadcaster allows you to subscribe to and unsubscribe from cancellation messages for a given job ID.
func NewCancellationBroadcaster ¶
func NewCancellationBroadcaster() *CancellationBroadcaster
NewCancellationBroadcaster sets up a new cancellation broadcaster with an empty registry.
func (*CancellationBroadcaster) Broadcast ¶
func (cb *CancellationBroadcaster) Broadcast(id uint64)
Broadcast broacasts a cancellation message to all currently subscribed cancellers.
func (*CancellationBroadcaster) Subscribe ¶
func (cb *CancellationBroadcaster) Subscribe(id uint64) <-chan struct{}
Subscribe will set up a subscription for cancellation messages for the given job ID. When a cancellation message comes in, the returned channel will be closed.
func (*CancellationBroadcaster) Unsubscribe ¶
func (cb *CancellationBroadcaster) Unsubscribe(id uint64, ch <-chan struct{})
Unsubscribe removes an existing subscription for the channel.
type FileJobQueue ¶ added in v1.3.0
type FileJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
FileJobQueue is a JobQueue that uses directories for input, state, and output
func NewFileJobQueue ¶ added in v1.3.0
func NewFileJobQueue(baseDir, queue string, pollingInterval time.Duration) (*FileJobQueue, error)
NewFileJobQueue creates a *FileJobQueue from a base directory and queue name
func (*FileJobQueue) Cleanup ¶ added in v1.3.0
func (f *FileJobQueue) Cleanup() error
Cleanup is a no-op
func (*FileJobQueue) Jobs ¶ added in v1.3.0
func (f *FileJobQueue) Jobs(ctx gocontext.Context) (<-chan Job, error)
Jobs returns a channel of jobs from the created directory
func (*FileJobQueue) Name ¶
func (q *FileJobQueue) Name() string
Name returns the name of this queue type, wow!
type FinishState ¶
type FinishState string
FinishState is the state that a job finished with (such as pass/fail/etc.). You should not provide a string directly, but use one of the FinishStateX constants defined in this package.
const ( FinishStatePassed FinishState = "passed" FinishStateFailed FinishState = "failed" FinishStateErrored FinishState = "errored" FinishStateCancelled FinishState = "cancelled" )
Valid finish states for the FinishState type
type HTTPJobQueue ¶
type HTTPJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
HTTPJobQueue is a JobQueue that uses http
func NewHTTPJobQueue ¶
func NewHTTPJobQueue(jobBoardURL *url.URL, site, providerName, queue string, cb *CancellationBroadcaster) (*HTTPJobQueue, error)
NewHTTPJobQueue creates a new http job queue
func NewHTTPJobQueueWithIntervals ¶
func NewHTTPJobQueueWithIntervals(jobBoardURL *url.URL, site, providerName, queue string, pollInterval, refreshClaimInterval time.Duration, cb *CancellationBroadcaster) (*HTTPJobQueue, error)
NewHTTPJobQueueWithIntervals creates a new http job queue with the specified poll and refresh claim intervals
func (*HTTPJobQueue) Jobs ¶
func (q *HTTPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs consumes new jobs from job-board
func (*HTTPJobQueue) Name ¶
func (q *HTTPJobQueue) Name() string
Name returns the name of this queue type, wow!
type Job ¶
type Job interface { Payload() *JobPayload RawPayload() *simplejson.Json StartAttributes() *backend.StartAttributes Received(gocontext.Context) error Started(gocontext.Context) error Error(gocontext.Context, string) error Requeue(gocontext.Context) error Finish(gocontext.Context, FinishState) error LogWriter(gocontext.Context, time.Duration) (LogWriter, error) Name() string SetupContext(gocontext.Context) gocontext.Context }
A Job ties togeher all the elements required for a build job
type JobJobPayload ¶
type JobJobPayload struct { ID uint64 `json:"id"` Number string `json:"number"` QueuedAt *time.Time `json:"queued_at"` }
JobJobPayload contains information about the job.
type JobMetaPayload ¶
type JobMetaPayload struct {
StateUpdateCount uint `json:"state_update_count"`
}
JobMetaPayload contains meta information about the job.
type JobPayload ¶
type JobPayload struct { Type string `json:"type"` Job JobJobPayload `json:"job"` Build BuildPayload `json:"source"` Repository RepositoryPayload `json:"repository"` UUID string `json:"uuid"` Config map[string]interface{} `json:"config"` Timeouts TimeoutsPayload `json:"timeouts,omitempty"` VMType string `json:"vm_type"` Meta JobMetaPayload `json:"meta"` }
JobPayload is the payload we receive over RabbitMQ.
type JobQueue ¶
type JobQueue interface { Jobs(gocontext.Context) (<-chan Job, error) Name() string Cleanup() error }
JobQueue is the minimal interface needed by a ProcessorPool
type LogWriter ¶
type LogWriter interface { io.WriteCloser WriteAndClose([]byte) (int, error) Timeout() <-chan time.Time SetMaxLogLength(int) }
LogWriter is primarily an io.Writer that will send all bytes to travis-logs for processing, and also has some utility methods for timeouts and log length limiting. Each LogWriter is tied to a given job, and can be gotten by calling the LogWriter() method on a Job.
type MultiSourceJobQueue ¶
type MultiSourceJobQueue struct {
// contains filtered or unexported fields
}
func NewMultiSourceJobQueue ¶
func NewMultiSourceJobQueue(queues ...JobQueue) *MultiSourceJobQueue
func (*MultiSourceJobQueue) Cleanup ¶
func (msjq *MultiSourceJobQueue) Cleanup() error
Cleanup runs cleanup for each source queue
func (*MultiSourceJobQueue) Jobs ¶
func (msjq *MultiSourceJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs returns a Job channel that selects over each source queue Job channel
func (*MultiSourceJobQueue) Name ¶
func (msjq *MultiSourceJobQueue) Name() string
Name builds a name from each source queue name
type Processor ¶
type Processor struct { ID string // ProcessedCount contains the number of jobs that has been processed // by this Processor. This value should not be modified outside of the // Processor. ProcessedCount int // CurrentStatus contains the current status of the processor, and can // be one of "new", "waiting", "processing" or "done". CurrentStatus string // LastJobID contains the ID of the last job the processor processed. LastJobID uint64 SkipShutdownOnLogTimeout bool // contains filtered or unexported fields }
A Processor gets jobs off the job queue and coordinates running it with other components.
func NewProcessor ¶
func NewProcessor(ctx gocontext.Context, hostname string, queue JobQueue, provider backend.Provider, generator BuildScriptGenerator, cancellationBroadcaster *CancellationBroadcaster, config ProcessorConfig) (*Processor, error)
NewProcessor creates a new processor that will run the build jobs on the given channel using the given provider and getting build scripts from the generator.
func (*Processor) GracefulShutdown ¶
func (p *Processor) GracefulShutdown()
GracefulShutdown tells the processor to finish the job it is currently processing, but not pick up any new jobs. This method will return immediately, the processor is done when Run() returns.
type ProcessorConfig ¶
type ProcessorPool ¶
type ProcessorPool struct { Context gocontext.Context Provider backend.Provider Generator BuildScriptGenerator CancellationBroadcaster *CancellationBroadcaster Hostname string HardTimeout, InitialSleep, LogTimeout, ScriptUploadTimeout, StartupTimeout time.Duration MaxLogLength int PayloadFilterExecutable string SkipShutdownOnLogTimeout bool // contains filtered or unexported fields }
A ProcessorPool spins up multiple Processors handling build jobs from the same queue.
func NewProcessorPool ¶
func NewProcessorPool(ppc *ProcessorPoolConfig, provider backend.Provider, generator BuildScriptGenerator, cancellationBroadcaster *CancellationBroadcaster) *ProcessorPool
NewProcessorPool creates a new processor pool using the given arguments.
func (*ProcessorPool) Decr ¶ added in v1.3.0
func (p *ProcessorPool) Decr()
Decr pops a processor out of the pool and issues a graceful shutdown
func (*ProcessorPool) Each ¶ added in v1.3.0
func (p *ProcessorPool) Each(f func(int, *Processor))
Each loops through all the processors in the pool and calls the given function for each of them, passing in the index and the processor. The order of the processors is the same for the same set of processors.
func (*ProcessorPool) GracefulShutdown ¶
func (p *ProcessorPool) GracefulShutdown(togglePause bool)
GracefulShutdown causes each processor in the pool to start its graceful shutdown.
func (*ProcessorPool) Incr ¶ added in v1.3.0
func (p *ProcessorPool) Incr()
Incr adds a single running processor to the pool
func (*ProcessorPool) Run ¶
func (p *ProcessorPool) Run(poolSize int, queue JobQueue) error
Run starts up a number of processors and connects them to the given queue. This method stalls until all processors have finished.
func (*ProcessorPool) Size ¶ added in v1.3.0
func (p *ProcessorPool) Size() int
Size returns the number of processors in the pool
func (*ProcessorPool) TotalProcessed ¶
func (p *ProcessorPool) TotalProcessed() int
TotalProcessed returns the sum of all processor ProcessedCount values.
type ProcessorPoolConfig ¶
type RepositoryPayload ¶
RepositoryPayload contains information about the repository.
type SentryHook ¶ added in v1.3.0
SentryHook delivers logs to a sentry server
func NewSentryHook ¶ added in v1.3.0
func NewSentryHook(DSN string, levels []logrus.Level) (*SentryHook, error)
NewSentryHook creates a hook to be added to an instance of logger and initializes the raven client. This method sets the timeout to 100 milliseconds.
func (*SentryHook) Fire ¶ added in v1.3.0
func (hook *SentryHook) Fire(entry *logrus.Entry) error
Fire is called when an event should be sent to sentry
func (*SentryHook) Levels ¶ added in v1.3.0
func (hook *SentryHook) Levels() []logrus.Level
Levels returns the available logging levels.
type TimeoutsPayload ¶ added in v1.3.0
type TimeoutsPayload struct { HardLimit uint64 `json:"hard_limit"` LogSilence uint64 `json:"log_silence"` }
TimeoutsPayload contains information about any custom timeouts. The timeouts are given in seconds, and a value of 0 means no custom timeout is set.
Source Files ¶
- amqp_canceller.go
- amqp_job.go
- amqp_job_queue.go
- amqp_log_writer.go
- build_script_generator.go
- canceller.go
- cli.go
- doc.go
- file_job.go
- file_job_queue.go
- file_log_writer.go
- help.go
- http_job.go
- http_job_queue.go
- http_log_part_sink.go
- http_log_writer.go
- job.go
- job_queue.go
- log_writer.go
- logs_queue.go
- multi_source_job_queue.go
- package.go
- processor.go
- processor_pool.go
- sentry_logrus_hook.go
- step_check_cancellation.go
- step_generate_script.go
- step_open_log_writer.go
- step_run_script.go
- step_send_received.go
- step_sleep.go
- step_start_instance.go
- step_subscribe_cancellation.go
- step_transform_build_json.go
- step_update_state.go
- step_upload_script.go
- step_write_worker_info.go
- version.go
Directories ¶
Path | Synopsis |
---|---|
Package backend provides the compute instance backends supported by Worker.
|
Package backend provides the compute instance backends supported by Worker. |
cmd
|
|
travis-worker
Package main implements the CLI for the travis-worker binary
|
Package main implements the CLI for the travis-worker binary |
Package config contains the flags and defaults for Worker configuration.
|
Package config contains the flags and defaults for Worker configuration. |
Package context contains functions to embed data in a context.
|
Package context contains functions to embed data in a context. |
Package image contains logic for image selection logic.
|
Package image contains logic for image selection logic. |
Package metrics provides easy methods to send metrics
|
Package metrics provides easy methods to send metrics |
Package ratelimit implements a rate limiter to avoid calling APIs too often.
|
Package ratelimit implements a rate limiter to avoid calling APIs too often. |