Documentation ¶
Overview ¶
Package worker implements the core of Worker.
The main coordinating component in the core is the Processor, and is a good place to start when exploring the code. It gets the jobs off the job queue and calls the right things in order to run it. The ProcessorPool starts up the required number of Processors to get the concurrency that's wanted for a single Worker instance.
Index ¶
- Constants
- Variables
- type AMQPCanceller
- type AMQPJobQueue
- type AMQPLogWriterFactory
- type BuildPayload
- type BuildScriptGenerator
- type BuildScriptGeneratorError
- type BuildTracePersister
- type CLI
- type CancellationBroadcaster
- type EnvVar
- type FileJobQueue
- type FinishState
- type HTTPJobQueue
- type Job
- type JobJobPayload
- type JobMetaPayload
- type JobPayload
- type JobQueue
- type JobStartedMeta
- type LogWriter
- type LogWriterFactory
- type MultiSourceJobQueue
- type Processor
- type ProcessorConfig
- type ProcessorPool
- func (p *ProcessorPool) Decr()
- func (p *ProcessorPool) Each(f func(int, *Processor))
- func (p *ProcessorPool) ExpectedSize() int
- func (p *ProcessorPool) GracefulShutdown(togglePause bool)
- func (p *ProcessorPool) Incr()
- func (p *ProcessorPool) Ready() bool
- func (p *ProcessorPool) Run(poolSize int, queue JobQueue, logWriterFactory LogWriterFactory) error
- func (p *ProcessorPool) SetSize(newSize int)
- func (p *ProcessorPool) Size() int
- func (p *ProcessorPool) TotalProcessed() int
- type ProcessorPoolConfig
- type RemoteController
- func (api *RemoteController) CheckAuth(next http.Handler) http.Handler
- func (api *RemoteController) DecrementPool(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) GetWorkerInfo(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) HealthCheck(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) IncrementPool(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) ReadyCheck(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) SetContext(next http.Handler) http.Handler
- func (api *RemoteController) Setup()
- func (api *RemoteController) ShutdownWorker(w http.ResponseWriter, req *http.Request)
- func (api *RemoteController) UpdateWorkerInfo(w http.ResponseWriter, req *http.Request)
- type RepositoryPayload
- type SentryHook
- type TimeoutsPayload
Constants ¶
const ( VMTypeDefault = "default" VMTypePremium = "premium" )
Variables ¶
var ( // LogWriterTick is how often the buffer should be flushed out and sent to // travis-logs. LogWriterTick = 500 * time.Millisecond // LogChunkSize is a bit of a magic number, calculated like this: The // maximum Pusher payload is 10 kB (or 10 KiB, who knows, but let's go with // 10 kB since that is smaller). Looking at the travis-logs source, the // current message overhead (i.e. the part of the payload that isn't // the content of the log part) is 42 bytes + the length of the JSON- // encoded ID and the length of the JSON-encoded sequence number. A 64- // bit number is up to 20 digits long, so that means (assuming we don't // go over 64-bit numbers) the overhead is up to 82 bytes. That means // we can send up to 9918 bytes of content. However, the JSON-encoded // version of a string can be significantly longer than the raw bytes. // Worst case that I could find is "<", which with the Go JSON encoder // becomes "\u003c" (i.e. six bytes long). So, given a string of just // left angle brackets, the string would become six times as long, // meaning that the longest string we can take is 1653. We could still // get errors if we go over 64-bit numbers, but I find the likeliness // of that happening to both the sequence number, the ID, and us maxing // out the worst-case logs to be quite unlikely, so I'm willing to live // with that. --Sarah LogChunkSize = 1653 )
var ( // VersionString is the git describe version set at build time VersionString = "?" // RevisionString is the git revision set at build time RevisionString = "?" // RevisionURLString is the full URL to the revision set at build time RevisionURLString = "?" // GeneratedString is the build date set at build time GeneratedString = "?" // CopyrightString is the copyright set at build time CopyrightString = "?" )
var JobCancelledError = errors.New("job cancelled")
var LogWriterTimeout = errors.New("log writer timeout")
var MaxLogLengthExceeded = errors.New("maximum log length exceeded")
var VMConfigDefault = backend.VmConfig{GpuCount: 0, GpuType: ""}
Functions ¶
This section is empty.
Types ¶
type AMQPCanceller ¶ added in v1.3.0
type AMQPCanceller struct {
// contains filtered or unexported fields
}
AMQPCanceller is responsible for listening to a command queue on AMQP and dispatching the commands to the right place. Currently the only valid command is the 'cancel job' command.
func NewAMQPCanceller ¶ added in v1.3.0
func NewAMQPCanceller(ctx gocontext.Context, conn *amqp.Connection, cancellationBroadcaster *CancellationBroadcaster) *AMQPCanceller
NewAMQPCanceller creates a new AMQPCanceller. No network traffic occurs until you call Run()
func (*AMQPCanceller) Run ¶ added in v1.3.0
func (d *AMQPCanceller) Run()
Run will make the AMQPCanceller listen to the worker command queue and start dispatching any incoming commands.
type AMQPJobQueue ¶ added in v1.3.0
type AMQPJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
AMQPJobQueue is a JobQueue that uses AMQP
func NewAMQPJobQueue ¶ added in v1.3.0
func NewAMQPJobQueue(conn *amqp.Connection, queue string, stateUpdatePoolSize int, sharded bool) (*AMQPJobQueue, error)
NewAMQPJobQueue creates a AMQPJobQueue backed by the given AMQP connections and connects to the AMQP queue with the given name. The queue will be declared in AMQP when this function is called, so an error could be raised if the queue already exists, but with different attributes than we expect.
func (*AMQPJobQueue) Cleanup ¶ added in v1.3.0
func (q *AMQPJobQueue) Cleanup() error
Cleanup closes the underlying AMQP connection
func (*AMQPJobQueue) Jobs ¶ added in v1.3.0
func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs creates a new consumer on the queue, and returns three channels. The first channel gets sent every BuildJob that we receive from AMQP. The stopChan is a channel that can be closed in order to stop the consumer.
func (*AMQPJobQueue) Name ¶
func (q *AMQPJobQueue) Name() string
Name returns the name of this queue type, wow!
type AMQPLogWriterFactory ¶
type AMQPLogWriterFactory struct {
// contains filtered or unexported fields
}
func NewAMQPLogWriterFactory ¶
func NewAMQPLogWriterFactory(conn *amqp.Connection, sharded bool) (*AMQPLogWriterFactory, error)
func (*AMQPLogWriterFactory) Cleanup ¶
func (l *AMQPLogWriterFactory) Cleanup() error
type BuildPayload ¶
BuildPayload contains information about the build.
type BuildScriptGenerator ¶
A BuildScriptGenerator generates a build script for a given job payload.
func NewBuildScriptGenerator ¶
func NewBuildScriptGenerator(cfg *config.Config) BuildScriptGenerator
NewBuildScriptGenerator creates a generator backed by an HTTP API.
type BuildScriptGeneratorError ¶
type BuildScriptGeneratorError struct { // true when this error can be recovered by retrying later Recover bool // contains filtered or unexported fields }
A BuildScriptGeneratorError is sometimes used by the Generate method on a BuildScriptGenerator to return more metadata about an error.
type BuildTracePersister ¶
A BuildTracePersister persists a build trace. (duh)
func NewBuildTracePersister ¶
func NewBuildTracePersister(cfg *config.Config) BuildTracePersister
NewBuildTracePersister creates a build trace persister backed by S3
type CLI ¶ added in v1.3.0
type CLI struct { Config *config.Config BuildScriptGenerator BuildScriptGenerator BuildTracePersister BuildTracePersister BackendProvider backend.Provider ProcessorPool *ProcessorPool CancellationBroadcaster *CancellationBroadcaster JobQueue JobQueue LogWriterFactory LogWriterFactory // contains filtered or unexported fields }
CLI is the top level of execution for the whole shebang
type CancellationBroadcaster ¶
type CancellationBroadcaster struct {
// contains filtered or unexported fields
}
A CancellationBroadcaster allows you to subscribe to and unsubscribe from cancellation messages for a given job ID.
func NewCancellationBroadcaster ¶
func NewCancellationBroadcaster() *CancellationBroadcaster
NewCancellationBroadcaster sets up a new cancellation broadcaster with an empty registry.
func (*CancellationBroadcaster) Broadcast ¶
func (cb *CancellationBroadcaster) Broadcast(id uint64)
Broadcast broacasts a cancellation message to all currently subscribed cancellers.
func (*CancellationBroadcaster) Subscribe ¶
func (cb *CancellationBroadcaster) Subscribe(id uint64) <-chan struct{}
Subscribe will set up a subscription for cancellation messages for the given job ID. When a cancellation message comes in, the returned channel will be closed.
func (*CancellationBroadcaster) Unsubscribe ¶
func (cb *CancellationBroadcaster) Unsubscribe(id uint64, ch <-chan struct{})
Unsubscribe removes an existing subscription for the channel.
type FileJobQueue ¶ added in v1.3.0
type FileJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
FileJobQueue is a JobQueue that uses directories for input, state, and output
func NewFileJobQueue ¶ added in v1.3.0
func NewFileJobQueue(baseDir, queue string, pollingInterval time.Duration) (*FileJobQueue, error)
NewFileJobQueue creates a *FileJobQueue from a base directory and queue name
func (*FileJobQueue) Cleanup ¶ added in v1.3.0
func (f *FileJobQueue) Cleanup() error
Cleanup is a no-op
func (*FileJobQueue) Jobs ¶ added in v1.3.0
func (f *FileJobQueue) Jobs(ctx gocontext.Context) (<-chan Job, error)
Jobs returns a channel of jobs from the created directory
func (*FileJobQueue) Name ¶
func (q *FileJobQueue) Name() string
Name returns the name of this queue type, wow!
type FinishState ¶
type FinishState string
FinishState is the state that a job finished with (such as pass/fail/etc.). You should not provide a string directly, but use one of the FinishStateX constants defined in this package.
const ( FinishStatePassed FinishState = "passed" FinishStateFailed FinishState = "failed" FinishStateErrored FinishState = "errored" FinishStateCancelled FinishState = "cancelled" )
Valid finish states for the FinishState type
type HTTPJobQueue ¶
type HTTPJobQueue struct {
DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string
// contains filtered or unexported fields
}
HTTPJobQueue is a JobQueue that uses http
func NewHTTPJobQueue ¶
func NewHTTPJobQueue(jobBoardURL *url.URL, site, providerName, queue string, cb *CancellationBroadcaster) (*HTTPJobQueue, error)
NewHTTPJobQueue creates a new http job queue
func NewHTTPJobQueueWithIntervals ¶
func NewHTTPJobQueueWithIntervals(jobBoardURL *url.URL, site, providerName, queue string, pollInterval, refreshClaimInterval time.Duration, cb *CancellationBroadcaster) (*HTTPJobQueue, error)
NewHTTPJobQueueWithIntervals creates a new http job queue with the specified poll and refresh claim intervals
func (*HTTPJobQueue) Jobs ¶
func (q *HTTPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs consumes new jobs from job-board
func (*HTTPJobQueue) Name ¶
func (q *HTTPJobQueue) Name() string
Name returns the name of this queue type, wow!
type Job ¶
type Job interface { Payload() *JobPayload RawPayload() *simplejson.Json StartAttributes() *backend.StartAttributes FinishState() FinishState Requeued() bool Received(gocontext.Context) error Started(gocontext.Context) error Error(gocontext.Context, string) error Requeue(gocontext.Context) error Finish(gocontext.Context, FinishState) error LogWriter(gocontext.Context, time.Duration) (LogWriter, error) Name() string SetupContext(gocontext.Context) gocontext.Context }
A Job ties togeher all the elements required for a build job
type JobJobPayload ¶
type JobJobPayload struct { ID uint64 `json:"id"` Number string `json:"number"` QueuedAt *time.Time `json:"queued_at"` }
JobJobPayload contains information about the job.
type JobMetaPayload ¶
type JobMetaPayload struct {
StateUpdateCount uint `json:"state_update_count"`
}
JobMetaPayload contains meta information about the job.
type JobPayload ¶
type JobPayload struct { Type string `json:"type"` Job JobJobPayload `json:"job"` Build BuildPayload `json:"source"` Repository RepositoryPayload `json:"repository"` UUID string `json:"uuid"` Config map[string]interface{} `json:"config"` Timeouts TimeoutsPayload `json:"timeouts,omitempty"` VMType string `json:"vm_type"` VMConfig backend.VmConfig `json:"vm_config"` Meta JobMetaPayload `json:"meta"` Queue string `json:"queue"` Trace bool `json:"trace"` Warmer bool `json:"warmer"` }
JobPayload is the payload we receive over RabbitMQ.
type JobQueue ¶
type JobQueue interface { Jobs(gocontext.Context) (<-chan Job, error) Name() string Cleanup() error }
JobQueue is the minimal interface needed by a ProcessorPool
type JobStartedMeta ¶
type JobStartedMeta struct { QueuedAt *time.Time `json:"queued_at"` Repo string `json:"repo"` Queue string `json:"queue"` Infra string `json:"infra"` }
JobStartedMeta is metadata that is useful for computing time to first log line downstream, and breaking it down into further dimensions.
type LogWriter ¶
type LogWriter interface { io.WriteCloser WriteAndClose([]byte) (int, error) Timeout() <-chan time.Time SetMaxLogLength(int) SetJobStarted(meta *JobStartedMeta) SetCancelFunc(gocontext.CancelFunc) MaxLengthReached() bool }
LogWriter is primarily an io.Writer that will send all bytes to travis-logs for processing, and also has some utility methods for timeouts and log length limiting. Each LogWriter is tied to a given job, and can be gotten by calling the LogWriter() method on a Job.
type LogWriterFactory ¶
type MultiSourceJobQueue ¶
type MultiSourceJobQueue struct {
// contains filtered or unexported fields
}
func NewMultiSourceJobQueue ¶
func NewMultiSourceJobQueue(queues ...JobQueue) *MultiSourceJobQueue
func (*MultiSourceJobQueue) Cleanup ¶
func (msjq *MultiSourceJobQueue) Cleanup() error
Cleanup runs cleanup for each source queue
func (*MultiSourceJobQueue) Jobs ¶
func (msjq *MultiSourceJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err error)
Jobs returns a Job channel that selects over each source queue Job channel
func (*MultiSourceJobQueue) Name ¶
func (msjq *MultiSourceJobQueue) Name() string
Name builds a name from each source queue name
type Processor ¶
type Processor struct { ID string // ProcessedCount contains the number of jobs that has been processed // by this Processor. This value should not be modified outside of the // Processor. ProcessedCount int // CurrentStatus contains the current status of the processor, and can // be one of "new", "waiting", "processing" or "done". CurrentStatus string // LastJobID contains the ID of the last job the processor processed. LastJobID uint64 // contains filtered or unexported fields }
A Processor gets jobs off the job queue and coordinates running it with other components.
func NewProcessor ¶
func NewProcessor(ctx gocontext.Context, hostname string, queue JobQueue, logWriterFactory LogWriterFactory, provider backend.Provider, generator BuildScriptGenerator, persister BuildTracePersister, cancellationBroadcaster *CancellationBroadcaster, config ProcessorConfig) (*Processor, error)
NewProcessor creates a new processor that will run the build jobs on the given channel using the given provider and getting build scripts from the generator.
func (*Processor) GracefulShutdown ¶
func (p *Processor) GracefulShutdown()
GracefulShutdown tells the processor to finish the job it is currently processing, but not pick up any new jobs. This method will return immediately, the processor is done when Run() returns.
type ProcessorConfig ¶
type ProcessorPool ¶
type ProcessorPool struct { Context gocontext.Context Provider backend.Provider Generator BuildScriptGenerator Persister BuildTracePersister CancellationBroadcaster *CancellationBroadcaster Hostname string Config *config.Config // contains filtered or unexported fields }
A ProcessorPool spins up multiple Processors handling build jobs from the same queue.
func NewProcessorPool ¶
func NewProcessorPool(ppc *ProcessorPoolConfig, provider backend.Provider, generator BuildScriptGenerator, persister BuildTracePersister, cancellationBroadcaster *CancellationBroadcaster) *ProcessorPool
NewProcessorPool creates a new processor pool using the given arguments.
func (*ProcessorPool) Decr ¶ added in v1.3.0
func (p *ProcessorPool) Decr()
Decr pops a processor out of the pool and issues a graceful shutdown
func (*ProcessorPool) Each ¶ added in v1.3.0
func (p *ProcessorPool) Each(f func(int, *Processor))
Each loops through all the processors in the pool and calls the given function for each of them, passing in the index and the processor. The order of the processors is the same for the same set of processors.
func (*ProcessorPool) ExpectedSize ¶
func (p *ProcessorPool) ExpectedSize() int
ExpectedSize returns the size of the pool once gracefully shutdown processors complete.
After calling SetSize, ExpectedSize will soon reflect the requested new size, while Size will include processors that are still processing their last job before shutting down.
func (*ProcessorPool) GracefulShutdown ¶
func (p *ProcessorPool) GracefulShutdown(togglePause bool)
GracefulShutdown causes each processor in the pool to start its graceful shutdown.
func (*ProcessorPool) Incr ¶ added in v1.3.0
func (p *ProcessorPool) Incr()
Incr adds a single running processor to the pool
func (*ProcessorPool) Ready ¶
func (p *ProcessorPool) Ready() bool
Ready returns true if the processor pool is running as expected. Returns false if the processor pool has not been started yet.
func (*ProcessorPool) Run ¶
func (p *ProcessorPool) Run(poolSize int, queue JobQueue, logWriterFactory LogWriterFactory) error
Run starts up a number of processors and connects them to the given queue. This method stalls until all processors have finished.
func (*ProcessorPool) SetSize ¶
func (p *ProcessorPool) SetSize(newSize int)
SetSize adjust the pool to run the given number of processors.
This operates in an eventually consistent manner. Because some workers may be running jobs, we may not be able to immediately adjust the pool size. Once jobs finish, the pool size should rest at the given value.
func (*ProcessorPool) Size ¶ added in v1.3.0
func (p *ProcessorPool) Size() int
Size returns the number of processors that are currently running.
This includes processors that are in the process of gracefully shutting down. It's important to track these because they are still running jobs and thus still using resources that need to be managed and tracked.
func (*ProcessorPool) TotalProcessed ¶
func (p *ProcessorPool) TotalProcessed() int
TotalProcessed returns the sum of all processor ProcessedCount values.
type ProcessorPoolConfig ¶
type RemoteController ¶
type RemoteController struct {
// contains filtered or unexported fields
}
RemoteController provides an HTTP API for controlling worker.
func (*RemoteController) CheckAuth ¶
func (api *RemoteController) CheckAuth(next http.Handler) http.Handler
CheckAuth is a middleware for all HTTP API methods that ensures that the configured basic auth credentials were passed in the request.
func (*RemoteController) DecrementPool ¶
func (api *RemoteController) DecrementPool(w http.ResponseWriter, req *http.Request)
DecrementPool tells the worker to gracefully shutdown a processor.
func (*RemoteController) GetWorkerInfo ¶
func (api *RemoteController) GetWorkerInfo(w http.ResponseWriter, req *http.Request)
GetWorkerInfo writes a JSON payload with useful information about the current state of worker as a whole.
func (*RemoteController) HealthCheck ¶
func (api *RemoteController) HealthCheck(w http.ResponseWriter, req *http.Request)
HealthCheck indicates whether worker is currently functioning in a healthy way. This can be used by a system like Kubernetes to determine whether to replace an instance of worker with a new one.
func (*RemoteController) IncrementPool ¶
func (api *RemoteController) IncrementPool(w http.ResponseWriter, req *http.Request)
IncrementPool tells the worker to spin up another processor.
func (*RemoteController) ReadyCheck ¶
func (api *RemoteController) ReadyCheck(w http.ResponseWriter, req *http.Request)
ReadyCheck indicates whether the worker is ready to receive requests. This is intended to be used as a readiness check in a system like Kubernetes. We should not attempt to interact with the remote controller until this returns a successful status.
func (*RemoteController) SetContext ¶
func (api *RemoteController) SetContext(next http.Handler) http.Handler
SetContext is a middleware function that loads some values into the request context. This allows these values to be shown in logging.
func (*RemoteController) Setup ¶
func (api *RemoteController) Setup()
Setup installs the HTTP routes that will handle requests to the HTTP API.
func (*RemoteController) ShutdownWorker ¶
func (api *RemoteController) ShutdownWorker(w http.ResponseWriter, req *http.Request)
ShutdownWorker tells the worker to shutdown.
Options can be passed in the body that determine whether the shutdown is done gracefully or not.
func (*RemoteController) UpdateWorkerInfo ¶
func (api *RemoteController) UpdateWorkerInfo(w http.ResponseWriter, req *http.Request)
UpdateWorkerInfo allows reconfiguring some parts of worker on the fly.
The main use of this is adjusting the size of the processor pool without interrupting existing running jobs.
type RepositoryPayload ¶
RepositoryPayload contains information about the repository.
type SentryHook ¶ added in v1.3.0
SentryHook delivers logs to a sentry server
func NewSentryHook ¶ added in v1.3.0
func NewSentryHook(DSN string, levels []logrus.Level) (*SentryHook, error)
NewSentryHook creates a hook to be added to an instance of logger and initializes the raven client. This method sets the timeout to 100 milliseconds.
func (*SentryHook) Fire ¶ added in v1.3.0
func (hook *SentryHook) Fire(entry *logrus.Entry) error
Fire is called when an event should be sent to sentry
func (*SentryHook) Levels ¶ added in v1.3.0
func (hook *SentryHook) Levels() []logrus.Level
Levels returns the available logging levels.
type TimeoutsPayload ¶ added in v1.3.0
type TimeoutsPayload struct { HardLimit uint64 `json:"hard_limit"` LogSilence uint64 `json:"log_silence"` }
TimeoutsPayload contains information about any custom timeouts. The timeouts are given in seconds, and a value of 0 means no custom timeout is set.
Source Files ¶
- amqp_canceller.go
- amqp_job.go
- amqp_job_queue.go
- amqp_log_writer.go
- amqp_log_writer_factory.go
- build_script_generator.go
- build_trace_persister.go
- canceller.go
- cli.go
- doc.go
- file_job.go
- file_job_queue.go
- file_log_writer.go
- help.go
- http_job.go
- http_job_queue.go
- http_log_part_sink.go
- http_log_writer.go
- job.go
- job_queue.go
- log_writer.go
- log_writer_factory.go
- multi_source_job_queue.go
- package.go
- processor.go
- processor_pool.go
- remote_controller.go
- sentry_logrus_hook.go
- step_check_cancellation.go
- step_download_trace.go
- step_generate_script.go
- step_open_log_writer.go
- step_run_script.go
- step_send_received.go
- step_sleep.go
- step_start_instance.go
- step_subscribe_cancellation.go
- step_transform_build_json.go
- step_update_state.go
- step_upload_script.go
- step_write_worker_info.go
- version.go
Directories ¶
Path | Synopsis |
---|---|
Package backend provides the compute instance backends supported by Worker.
|
Package backend provides the compute instance backends supported by Worker. |
cmd
|
|
travis-worker
Package main implements the CLI for the travis-worker binary
|
Package main implements the CLI for the travis-worker binary |
Package config contains the flags and defaults for Worker configuration.
|
Package config contains the flags and defaults for Worker configuration. |
Package context contains functions to embed data in a context.
|
Package context contains functions to embed data in a context. |
Package image contains logic for image selection logic.
|
Package image contains logic for image selection logic. |
Package metrics provides easy methods to send metrics
|
Package metrics provides easy methods to send metrics |
Package ratelimit implements a rate limiter to avoid calling APIs too often.
|
Package ratelimit implements a rate limiter to avoid calling APIs too often. |