Documentation
¶
Overview ¶
Package agent provides the key agent components - workers, worker pool, job runner, log streamer, artifact up/downloaders, etc.
It is intended for internal use of buildkite-agent only.
Index ¶
- Constants
- Variables
- func FetchTags(ctx context.Context, l logger.Logger, conf FetchTagsConfig) []string
- func K8sTagsFromEnv(envn []string) (map[string]string, error)
- func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, ...) (jobRunner, error)
- type APIClient
- type AgentConfiguration
- type AgentPool
- type AgentWorker
- func (a *AgentWorker) AcceptAndRunJob(ctx context.Context, job *api.Job) error
- func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error
- func (a *AgentWorker) Connect(ctx context.Context) error
- func (a *AgentWorker) Disconnect(ctx context.Context) error
- func (a *AgentWorker) Heartbeat(ctx context.Context) error
- func (a *AgentWorker) Ping(ctx context.Context) (*api.Job, error)
- func (a *AgentWorker) RunJob(ctx context.Context, acceptResponse *api.Job) error
- func (a *AgentWorker) Start(ctx context.Context, idleMonitor *IdleMonitor) error
- func (a *AgentWorker) Stop(graceful bool)
- type AgentWorkerConfig
- type EC2MetaData
- type EC2Tags
- type ECSMetadata
- type FetchTagsConfig
- type GCPLabels
- type GCPMetaData
- type IdleMonitor
- type JobRunner
- type JobRunnerConfig
- type LogStreamer
- type LogStreamerConfig
- type LogWriter
- type PipelineUploader
Constants ¶
const ( // BuildkiteMessageMax is the maximum length of "BUILDKITE_MESSAGE=...\0" // environment entry passed to bootstrap, beyond which it will be truncated // to avoid exceeding the system limit. Note that it includes the variable // name, equals sign, and null terminator. // // The true limit varies by system and may be shared with other env/argv // data. We'll settle on an arbitrary generous but reasonable value, and // adjust it if issues arise. // // macOS 10.15: 256 KiB shared by environment & argv // Linux 4.19: 128 KiB per k=v env // Windows 10: 16,384 KiB shared // POSIX: 4 KiB minimum shared BuildkiteMessageMax = 64 * 1024 // BuildkiteMessageName is the env var name of the build/commit message. BuildkiteMessageName = "BUILDKITE_MESSAGE" VerificationBehaviourWarn = "warn" VerificationBehaviourBlock = "block" )
const ( SignalReasonAgentRefused = "agent_refused" SignalReasonAgentStop = "agent_stop" SignalReasonCancel = "cancel" SignalReasonSignatureRejected = "signature_rejected" SignalReasonUnableToVerifySignature = "unable_to_verify_signature" SignalReasonProcessRunError = "process_run_error" )
Variables ¶
var ( ErrNoSignature = errors.New("job had no signature to verify") ErrVerificationFailed = errors.New("signature verification failed") ErrInvalidJob = errors.New("job does not match signed step") )
var ProtectedEnv = map[string]struct{}{
"BUILDKITE_AGENT_ACCESS_TOKEN": {},
"BUILDKITE_AGENT_DEBUG": {},
"BUILDKITE_AGENT_ENDPOINT": {},
"BUILDKITE_AGENT_PID": {},
"BUILDKITE_BIN_PATH": {},
"BUILDKITE_BUILD_PATH": {},
"BUILDKITE_COMMAND_EVAL": {},
"BUILDKITE_CONFIG_PATH": {},
"BUILDKITE_CONTAINER_COUNT": {},
"BUILDKITE_GIT_CLEAN_FLAGS": {},
"BUILDKITE_GIT_CLONE_FLAGS": {},
"BUILDKITE_GIT_CLONE_MIRROR_FLAGS": {},
"BUILDKITE_GIT_FETCH_FLAGS": {},
"BUILDKITE_GIT_MIRRORS_LOCK_TIMEOUT": {},
"BUILDKITE_GIT_MIRRORS_PATH": {},
"BUILDKITE_GIT_MIRRORS_SKIP_UPDATE": {},
"BUILDKITE_GIT_SUBMODULES": {},
"BUILDKITE_HOOKS_PATH": {},
"BUILDKITE_KUBERNETES_EXEC": {},
"BUILDKITE_LOCAL_HOOKS_ENABLED": {},
"BUILDKITE_PLUGINS_ENABLED": {},
"BUILDKITE_PLUGINS_PATH": {},
"BUILDKITE_SHELL": {},
"BUILDKITE_SSH_KEYSCAN": {},
}
Certain env can only be set by agent configuration. We show the user a warning in the bootstrap if they use any of these at a job level.
Functions ¶
func K8sTagsFromEnv ¶ added in v3.45.0
func NewJobRunner ¶
func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, conf JobRunnerConfig) (jobRunner, error)
Initializes the job runner
Types ¶
type APIClient ¶
type APIClient interface { AcceptJob(context.Context, *api.Job) (*api.Job, *api.Response, error) AcquireJob(context.Context, string, ...api.Header) (*api.Job, *api.Response, error) Annotate(context.Context, string, *api.Annotation) (*api.Response, error) AnnotationRemove(context.Context, string, string) (*api.Response, error) CancelBuild(context.Context, string) (*api.Build, *api.Response, error) Config() api.Config Connect(context.Context) (*api.Response, error) CreateArtifacts(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) Disconnect(context.Context) (*api.Response, error) ExistsMetaData(context.Context, string, string, string) (*api.MetaDataExists, *api.Response, error) FinishJob(context.Context, *api.Job) (*api.Response, error) FromAgentRegisterResponse(*api.AgentRegisterResponse) *api.Client FromPing(*api.Ping) *api.Client GenerateGithubCodeAccessToken(context.Context, string, string) (string, *api.Response, error) GetJobState(context.Context, string) (*api.JobState, *api.Response, error) GetMetaData(context.Context, string, string, string) (*api.MetaData, *api.Response, error) GetSecret(context.Context, *api.GetSecretRequest) (*api.Secret, *api.Response, error) Heartbeat(context.Context) (*api.Heartbeat, *api.Response, error) MetaDataKeys(context.Context, string, string) ([]string, *api.Response, error) OIDCToken(context.Context, *api.OIDCTokenRequest) (*api.OIDCToken, *api.Response, error) Ping(context.Context) (*api.Ping, *api.Response, error) PipelineUploadStatus(context.Context, string, string, ...api.Header) (*api.PipelineUploadStatus, *api.Response, error) Register(context.Context, *api.AgentRegisterRequest) (*api.AgentRegisterResponse, *api.Response, error) SaveHeaderTimes(context.Context, string, *api.HeaderTimes) (*api.Response, error) SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error) SetMetaData(context.Context, string, *api.MetaData) (*api.Response, error) StartJob(context.Context, *api.Job) (*api.Response, error) StepExport(context.Context, string, *api.StepExportRequest) (*api.StepExportResponse, *api.Response, error) StepUpdate(context.Context, string, *api.StepUpdate) (*api.Response, error) UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error) UploadChunk(context.Context, string, *api.Chunk) (*api.Response, error) UploadPipeline(context.Context, string, *api.PipelineChange, ...api.Header) (*api.Response, error) }
APIClient is an interface generated for "github.com/buildkite/agent/v3/api.Client".
type AgentConfiguration ¶
type AgentConfiguration struct { ConfigPath string BootstrapScript string BuildPath string HooksPath string SocketsPath string GitMirrorsPath string GitMirrorsLockTimeout int GitMirrorsSkipUpdate bool PluginsPath string GitCheckoutFlags string GitCloneFlags string GitCloneMirrorFlags string GitCleanFlags string GitFetchFlags string GitSubmodules bool AllowedRepositories []*regexp.Regexp AllowedPlugins []*regexp.Regexp AllowedEnvironmentVariables []*regexp.Regexp SSHKeyscan bool CommandEval bool PluginsEnabled bool PluginValidation bool LocalHooksEnabled bool StrictSingleHooks bool RunInPty bool KubernetesExec bool SigningJWKSFile string // Where to find the key to sign pipeline uploads with (passed through to jobs, they might be uploading pipelines) SigningJWKSKeyID string // The key ID to sign pipeline uploads with SigningAWSKMSKey string // The KMS key ID to sign pipeline uploads with DebugSigning bool // Whether to print step payloads when signing them VerificationJWKS any // The set of keys to verify jobs with VerificationFailureBehaviour string // What to do if job verification fails (one of `block` or `warn`) ANSITimestamps bool TimestampLines bool HealthCheckAddr string DisconnectAfterJob bool DisconnectAfterIdleTimeout int CancelGracePeriod int SignalGracePeriod time.Duration EnableJobLogTmpfile bool JobLogPath string WriteJobLogsToStdout bool LogFormat string Shell string Profile string RedactedVars []string AcquireJob string TracingBackend string TracingServiceName string TraceContextEncoding string DisableWarningsFor []string AllowMultipartArtifactUpload bool }
AgentConfiguration is the run-time configuration for an agent that has been loaded from the config file and command-line params
type AgentPool ¶
type AgentPool struct {
// contains filtered or unexported fields
}
AgentPool manages multiple parallel AgentWorkers
func NewAgentPool ¶
func NewAgentPool(workers []*AgentWorker) *AgentPool
NewAgentPool returns a new AgentPool
func (*AgentPool) StartStatusServer ¶ added in v3.72.0
type AgentWorker ¶
type AgentWorker struct {
// contains filtered or unexported fields
}
func NewAgentWorker ¶
func NewAgentWorker(l logger.Logger, a *api.AgentRegisterResponse, m *metrics.Collector, apiClient APIClient, c AgentWorkerConfig) *AgentWorker
Creates the agent worker and initializes its API Client
func (*AgentWorker) AcceptAndRunJob ¶
Accepts a job and runs it, only returns an error if something goes wrong
func (*AgentWorker) AcquireAndRunJob ¶
func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error
AcquireAndRunJob attempts to acquire a job an run it. It will retry at after the server determined interval (from the Retry-After response header) if the job is in the waiting state. If the job is in an unassignable state, it will return an error immediately. Otherwise, it will retry every 3s for 30 s. The whole operation will timeout after 5 min.
func (*AgentWorker) Connect ¶
func (a *AgentWorker) Connect(ctx context.Context) error
Connects the agent to the Buildkite Agent API, retrying up to 10 times if it fails.
func (*AgentWorker) Disconnect ¶
func (a *AgentWorker) Disconnect(ctx context.Context) error
Disconnect notifies the Buildkite API that this agent worker/session is permanently disconnecting. Don't spend long retrying, because we want to disconnect as fast as possible.
func (*AgentWorker) Heartbeat ¶
func (a *AgentWorker) Heartbeat(ctx context.Context) error
Performs a heatbeat
func (*AgentWorker) Ping ¶
Performs a ping that checks Buildkite for a job or action to take Returns a job, or nil if none is found
func (*AgentWorker) Start ¶
func (a *AgentWorker) Start(ctx context.Context, idleMonitor *IdleMonitor) error
Starts the agent worker
func (*AgentWorker) Stop ¶
func (a *AgentWorker) Stop(graceful bool)
Stops the agent from accepting new work and cancels any current work it's running
type AgentWorkerConfig ¶
type AgentWorkerConfig struct { // Whether to set debug in the job Debug bool // Whether to set debugHTTP in the job DebugHTTP bool // What signal to use for worker cancellation CancelSignal process.Signal // Time wait between sending the CancelSignal and SIGKILL to the process // groups that the executor starts SignalGracePeriod time.Duration // The index of this agent worker SpawnIndex int // The configuration of the agent from the CLI AgentConfiguration AgentConfiguration // Stdout of the parent agent process. Used for job log stdout writing arg, for simpler containerized log collection. AgentStdout io.Writer }
type EC2MetaData ¶
type EC2MetaData struct { }
type ECSMetadata ¶ added in v3.43.0
type ECSMetadata struct {
DisableHTTP2 bool
}
type FetchTagsConfig ¶
type FetchTagsConfig struct { Tags []string TagsFromK8s bool TagsFromEC2MetaData bool TagsFromEC2MetaDataPaths []string TagsFromEC2Tags bool TagsFromECSMetaData bool TagsFromGCPMetaData bool TagsFromGCPMetaDataPaths []string TagsFromGCPLabels bool TagsFromHost bool WaitForEC2TagsTimeout time.Duration WaitForEC2MetaDataTimeout time.Duration WaitForECSMetaDataTimeout time.Duration WaitForGCPLabelsTimeout time.Duration }
type GCPMetaData ¶
type GCPMetaData struct { }
type IdleMonitor ¶
This monitor has a 3rd implicit state we will call "initializing" that all agents start in Agents can transition to busy and/or idle but always start in the "initializing" state
// -> Busy // / ^ // Initializing | // \ v // -> Idle
This (intentionally?) ensures the DisconnectAfterIdleTimeout doesn't fire before agents have had a chance to run a job
func NewIdleMonitor ¶
func NewIdleMonitor(totalAgents int) *IdleMonitor
func (*IdleMonitor) Idle ¶
func (i *IdleMonitor) Idle() bool
func (*IdleMonitor) MarkBusy ¶
func (i *IdleMonitor) MarkBusy(agentUUID string)
func (*IdleMonitor) MarkIdle ¶
func (i *IdleMonitor) MarkIdle(agentUUID string)
type JobRunner ¶
type JobRunner struct { // How the JobRunner should respond when job verification fails (one of `block` or `warn`) VerificationFailureBehavior string // contains filtered or unexported fields }
func (*JobRunner) CancelAndStop ¶ added in v3.21.0
type JobRunnerConfig ¶
type JobRunnerConfig struct { // The configuration of the agent from the CLI AgentConfiguration AgentConfiguration // How often to check if the job has been cancelled JobStatusInterval time.Duration // The JSON Web Keyset for verifying the job JWKS any // A scope for metrics within a job MetricsScope *metrics.Scope // The job to run Job *api.Job // What signal to use for worker cancellation CancelSignal process.Signal // Whether to set debug in the job Debug bool // Whether to set debug HTTP Requests in the job DebugHTTP bool // Whether the job is executing as a k8s pod KubernetesExec bool // Stdout of the parent agent process. Used for job log stdout writing arg, for simpler containerized log collection. AgentStdout io.Writer }
type LogStreamer ¶
type LogStreamer struct {
// contains filtered or unexported fields
}
LogStreamer divides job log output into chunks (Process), and log streamer workers (goroutines created by Start) receive and upload those chunks. The actual uploading is performed by the callback.
func NewLogStreamer ¶
func NewLogStreamer( agentLogger logger.Logger, callback func(context.Context, *api.Chunk) error, conf LogStreamerConfig, ) *LogStreamer
NewLogStreamer creates a new instance of the log streamer.
func (*LogStreamer) FailedChunks ¶
func (ls *LogStreamer) FailedChunks() int
func (*LogStreamer) Process ¶
func (ls *LogStreamer) Process(ctx context.Context, output []byte) error
Process streams the output. It returns an error if the output data cannot be processed at all (e.g. the streamer was stopped or a hard limit was reached). Transient failures to upload logs are instead handled in the callback.
type LogStreamerConfig ¶
type LogStreamerConfig struct { // How many log streamer workers are running at any one time Concurrency int // The maximum size of each chunk MaxChunkSizeBytes uint64 // The maximum size of the log MaxSizeBytes uint64 }
LogStreamerConfig contains configuration options for the log streamer.
type LogWriter ¶ added in v3.32.0
type LogWriter struct {
// contains filtered or unexported fields
}
type PipelineUploader ¶ added in v3.44.0
type PipelineUploader struct { Client APIClient Change *api.PipelineChange JobID string RetrySleepFunc func(time.Duration) }
PipelineUploader contains the data needed to upload a pipeline to Buildkite
func (*PipelineUploader) Upload ¶ added in v3.45.0
Upload will first attempt to perform an async pipeline upload and, depending on the API's response, it will poll for the upload's status.
There are 3 "routes" that are relevant 1. Async Route: /jobs/:job_uuid/pipelines?async=true 2. Sync Route: /jobs/:job_uuid/pipelines 3. Status Route: /jobs/:job_uuid/pipelines/:upload_uuid
In this method, the agent will first upload the pipeline to the Async Route. Then, depending on the response it will behave differetly
1. The Async Route responds 202: poll the Status Route until the upload has beed "applied" 2. The Async Route responds with other 2xx: exit, the upload succeeded synchronously (possibly after retry) 3. The Async Route responds with other xxx: retry uploading the pipeline to the Async Route
Note that the Sync Route is not used by this version of the agent at all. Typically, the Aysnc Route will return 202 whether or not the pipeline upload has been processed.
However, the API has the option to treat the Async Route as if it were the Sync Route by returning a 2xx that's not a 202. This will tigger option 2. While the API currently does not do this, we want to maintain the flexbitity to do so in the future. If that is implemented, the Status Route will not be polled, and either the Async Route will be retried until a (non 202) 2xx is returned from the API, or the method will exit early with no error. This reiterates option 2.
If, during a retry loop in option 3, the API returns a 2xx that is a 202, then we assume the API changed to supporting Async Uploads between retries and option 1 will be taken.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package plugin provides types for managing agent plugins.
|
Package plugin provides types for managing agent plugins. |