Documentation ¶
Index ¶
- Constants
- Variables
- type BulkAddJobHandler
- type CloneRepoJobHandler
- type CopySourceJobHandler
- type CreateRepoJobHandler
- type DeleteRepoJobHandler
- type DeltaJobHandler
- type DeltaRepoJobHandler
- type IndexRecord
- type IndexRepoJobHandler
- type JobEntry
- func Deserialize(serial []byte) (*JobEntry, error)
- func NewBulkAddJob(id string, pkgs []string) *JobEntry
- func NewCloneRepoJob(repoID, newClone string, cloneAll bool) *JobEntry
- func NewCopySourceJob(repoID, target, source string, release int) *JobEntry
- func NewCreateRepoJob(id string) *JobEntry
- func NewDeleteRepoJob(id string) *JobEntry
- func NewDeltaIndexJob(repoID, packageID string) *JobEntry
- func NewDeltaJob(repoID, packageID string) *JobEntry
- func NewDeltaRepoJob(id string) *JobEntry
- func NewIndexRepoJob(id string) *JobEntry
- func NewPullRepoJob(sourceID, targetID string) *JobEntry
- func NewRemoveSourceJob(repoID, source string, release int) *JobEntry
- func NewTransitJob(path string) *JobEntry
- func NewTrimObsoleteJob(id string) *JobEntry
- func NewTrimPackagesJob(repoID string, maxKeep int) *JobEntry
- type JobFetcher
- type JobHandler
- type JobReaper
- type JobStore
- func (s *JobStore) ActiveJobs() ([]*libferry.Job, error)
- func (s *JobStore) ClaimAsyncJob() (*JobEntry, error)
- func (s *JobStore) ClaimSequentialJob() (*JobEntry, error)
- func (s *JobStore) Close()
- func (s *JobStore) CompletedJobs() ([]*libferry.Job, error)
- func (s *JobStore) FailedJobs() ([]*libferry.Job, error)
- func (s *JobStore) PushAsyncJob(j *JobEntry) error
- func (s *JobStore) PushSequentialJob(j *JobEntry) error
- func (s *JobStore) ResetCompleted() error
- func (s *JobStore) ResetFailed() error
- func (s *JobStore) RetireAsyncJob(j *JobEntry) error
- func (s *JobStore) RetireSequentialJob(j *JobEntry) error
- func (s *JobStore) UnclaimAsync() error
- func (s *JobStore) UnclaimSequential() error
- type JobType
- type Processor
- type PullRepoJobHandler
- type RemoveSourceJobHandler
- type TransitJobHandler
- type TrimObsoleteJobHandler
- type TrimPackagesJobHandler
- type Worker
Constants ¶
const ( // BulkAdd is a sequential job which will attempt to add all of the packages BulkAdd JobType = "BulkAdd" // CopySource is a sequential job to copy from one repo to another CopySource = "CopySource" // CloneRepo is a sequential job which will attempt to clone a repo CloneRepo = "CloneRepo" // CreateRepo is a sequential job which will attempt to create a new repo CreateRepo = "CreateRepo" // DeleteRepo is a sequential job which will attempt to delete a repository DeleteRepo = "DeleteRepo" // Delta is a parallel job which will attempt the construction of deltas for // a given package name + repo Delta = "Delta" // DeltaIndex is created in response to transit manifest events, and will // cause the repository to be reindexed after each delta job continues DeltaIndex = "Delta+Index" // DeltaRepo is a sequential job which creates Delta jobs for every package in // a repo DeltaRepo = "DeltaRepo" // IndexRepo is a sequential job that requests the repository be re-indexed IndexRepo = "IndexRepo" // PullRepo is a sequential job that will attempt to pull a repo PullRepo = "PullRepo" // RemoveSource is a sequential job that will attempt removal of packages RemoveSource = "RemoveSource" // TransitProcess is a sequential job that will process the incoming uploads // directory, dealing with each .tram upload TransitProcess = "TransitProcess" // TrimObsolete is a sequential job to permanently remove obsolete packages // from a repo TrimObsolete = "TrimObsolete" // TrimPackages is a sequential job to trim fat from a repository TrimPackages = "TrimPackages" )
const MaxJitter int64 = 512
MaxJitter sets the upper limit on the random jitter used for retry times
const (
// MaxJobsStored is the maximum amount of jobs we can store before rotating
MaxJobsStored = 100
)
const MinWait = time.Second * 2
MinWait is the minimum amount of time between retries for a worker
Variables ¶
var ( // BucketAsyncJobs holds all asynchronous jobs BucketAsyncJobs = []byte("Async") // BucketSequentialJobs holds all sequential jobs BucketSequentialJobs = []byte("Sync") // BucketSuccessJobs contains jobs that have completed successfully BucketSuccessJobs = []byte("CompletedSuccess") // BucketFailJobs contains jobs that completed with failure BucketFailJobs = []byte("CompletedFailure") // ErrEmptyQueue is returned to indicate a job is not available yet ErrEmptyQueue = errors.New("Queue is empty") // ErrBreakLoop is used only to break the foreach internally. ErrBreakLoop = errors.New("loop breaker") // BucketRecord is used as a subbucket for records BucketRecord = []byte("Record") // IndexRecordKey is used in the job store to mark the next write location IndexRecordKey = []byte("IndexRecord00") )
Functions ¶
This section is empty.
Types ¶
type BulkAddJobHandler ¶
type BulkAddJobHandler struct {
// contains filtered or unexported fields
}
BulkAddJobHandler is responsible for indexing repositories and should only ever be used in sequential queues.
func NewBulkAddJobHandler ¶
func NewBulkAddJobHandler(j *JobEntry) (*BulkAddJobHandler, error)
NewBulkAddJobHandler will create a job handler for the input job and ensure it validates
func (*BulkAddJobHandler) Describe ¶
func (j *BulkAddJobHandler) Describe() string
Describe returns a human readable description for this job
type CloneRepoJobHandler ¶
type CloneRepoJobHandler struct {
// contains filtered or unexported fields
}
CloneRepoJobHandler is responsible for cloning an existing repository
func NewCloneRepoJobHandler ¶
func NewCloneRepoJobHandler(j *JobEntry) (*CloneRepoJobHandler, error)
NewCloneRepoJobHandler will create a job handler for the input job and ensure it validates
func (*CloneRepoJobHandler) Describe ¶
func (j *CloneRepoJobHandler) Describe() string
Describe returns a human readable description for this job
type CopySourceJobHandler ¶
type CopySourceJobHandler struct {
// contains filtered or unexported fields
}
CopySourceJobHandler is responsible for removing packages by identifiers
func NewCopySourceJobHandler ¶
func NewCopySourceJobHandler(j *JobEntry) (*CopySourceJobHandler, error)
NewCopySourceJobHandler will create a job handler for the input job and ensure it validates
func (*CopySourceJobHandler) Describe ¶
func (j *CopySourceJobHandler) Describe() string
Describe returns a human readable description for this job
type CreateRepoJobHandler ¶
type CreateRepoJobHandler struct {
// contains filtered or unexported fields
}
CreateRepoJobHandler is responsible for creating new repositories and should only ever be used in sequential queues.
func NewCreateRepoJobHandler ¶
func NewCreateRepoJobHandler(j *JobEntry) (*CreateRepoJobHandler, error)
NewCreateRepoJobHandler will create a job handler for the input job and ensure it validates
func (*CreateRepoJobHandler) Describe ¶
func (j *CreateRepoJobHandler) Describe() string
Describe returns a human readable description for this job
type DeleteRepoJobHandler ¶
type DeleteRepoJobHandler struct {
// contains filtered or unexported fields
}
DeleteRepoJobHandler is responsible for creating new repositories and should only ever be used in sequential queues.
func NewDeleteRepoJobHandler ¶
func NewDeleteRepoJobHandler(j *JobEntry) (*DeleteRepoJobHandler, error)
NewDeleteRepoJobHandler will create a job handler for the input job and ensure it validates
func (*DeleteRepoJobHandler) Describe ¶
func (j *DeleteRepoJobHandler) Describe() string
Describe returns a human readable description for this job
type DeltaJobHandler ¶
type DeltaJobHandler struct {
// contains filtered or unexported fields
}
DeltaJobHandler is responsible for indexing repositories and should only ever be used in async queues. Deltas may take some time to produce and shouldn't be allowed to block the sequential processing queue.
func NewDeltaJobHandler ¶
func NewDeltaJobHandler(j *JobEntry, indexRepo bool) (*DeltaJobHandler, error)
NewDeltaJobHandler will create a job handler for the input job and ensure it validates
func (*DeltaJobHandler) Describe ¶
func (j *DeltaJobHandler) Describe() string
Describe returns a human readable description for this job
type DeltaRepoJobHandler ¶
type DeltaRepoJobHandler struct {
// contains filtered or unexported fields
}
DeltaRepoJobHandler is responsible for delta'ing repositories and should only ever be used in sequential queues.
func NewDeltaRepoJobHandler ¶
func NewDeltaRepoJobHandler(j *JobEntry) (*DeltaRepoJobHandler, error)
NewDeltaRepoJobHandler will create a job handler for the input job and ensure it validates
func (*DeltaRepoJobHandler) Describe ¶
func (j *DeltaRepoJobHandler) Describe() string
Describe returns a human readable description for this job
func (*DeltaRepoJobHandler) Execute ¶
func (j *DeltaRepoJobHandler) Execute(jproc *Processor, manager *core.Manager) error
Execute will delta the given repository if possible Note that it will NOT index the repository, this is a separate step as it takes a significant amount of time to fully produce all initial deltas.
This operation is ideally only used after the first import of a repository, after then deltas will be produced on the fly.
type IndexRecord ¶
type IndexRecord struct {
Index uint64
}
IndexRecord is just a simple helper to store the index record..
type IndexRepoJobHandler ¶
type IndexRepoJobHandler struct {
// contains filtered or unexported fields
}
IndexRepoJobHandler is responsible for indexing repositories and should only ever be used in sequential queues.
func NewIndexRepoJobHandler ¶
func NewIndexRepoJobHandler(j *JobEntry) (*IndexRepoJobHandler, error)
NewIndexRepoJobHandler will create a job handler for the input job and ensure it validates
func (*IndexRepoJobHandler) Describe ¶
func (j *IndexRepoJobHandler) Describe() string
Describe returns a human readable description for this job
type JobEntry ¶
type JobEntry struct { Type JobType Claimed bool Params []string Timing libferry.TimingInformation // Store all timing information // contains filtered or unexported fields }
JobEntry is an entry in the JobQueue
func Deserialize ¶
Deserialize use Gob decoding to convert a byte slice to a JobEntry
func NewBulkAddJob ¶
NewBulkAddJob will return a job suitable for adding to the job processor
func NewCloneRepoJob ¶
NewCloneRepoJob will return a job suitable for adding to the job processor
func NewCopySourceJob ¶
NewCopySourceJob will return a job suitable for adding to the job processor
func NewCreateRepoJob ¶
NewCreateRepoJob will return a job suitable for adding to the job processor
func NewDeleteRepoJob ¶
NewDeleteRepoJob will return a job suitable for adding to the job processor
func NewDeltaIndexJob ¶
NewDeltaIndexJob will return a new job for creating delta packages as well as scheduling an index operation when complete.
func NewDeltaJob ¶
NewDeltaJob will return a job suitable for adding to the job processor
func NewDeltaRepoJob ¶
NewDeltaRepoJob will return a job suitable for adding to the job processor
func NewIndexRepoJob ¶
NewIndexRepoJob will return a job suitable for adding to the job processor
func NewPullRepoJob ¶
NewPullRepoJob will return a job suitable for adding to the job processor
func NewRemoveSourceJob ¶
NewRemoveSourceJob will return a job suitable for adding to the job processor
func NewTransitJob ¶
NewTransitJob will return a job suitable for adding to the job processor
func NewTrimObsoleteJob ¶
NewTrimObsoleteJob will return a job suitable for adding to the job processor
func NewTrimPackagesJob ¶
NewTrimPackagesJob will return a job suitable for adding to the job processor
type JobFetcher ¶
JobFetcher will be provided by either the Async or Sequential claim functions
type JobHandler ¶
type JobHandler interface { // Execute will attempt to execute the given job Execute(proc *Processor, m *core.Manager) error // Describe will return an appropriate description for the job Describe() string }
A JobHandler is created for each JobEntry, to provide specialised handling of the job type
func NewJobHandler ¶
func NewJobHandler(j *JobEntry) (JobHandler, error)
NewJobHandler will return a handler that is loaded only during the execution of a previously serialised job
type JobStore ¶
type JobStore struct {
// contains filtered or unexported fields
}
JobStore handles the storage and manipulation of incomplete jobs
func (*JobStore) ActiveJobs ¶
ActiveJobs will attempt to return a list of active jobs within the scheduler suitable for consumption by the CLI client
func (*JobStore) ClaimAsyncJob ¶
ClaimAsyncJob gets the first available asynchronous job, if one exists
func (*JobStore) ClaimSequentialJob ¶
ClaimSequentialJob gets the first available synchronous job, if one exists
func (*JobStore) CompletedJobs ¶
CompletedJobs will return all successfully completed jobs still stored
func (*JobStore) FailedJobs ¶
FailedJobs will return all failed jobs that are still stored
func (*JobStore) PushAsyncJob ¶
PushAsyncJob will enqueue a new asynchronous job
func (*JobStore) PushSequentialJob ¶
PushSequentialJob will enqueue a new sequential job
func (*JobStore) ResetCompleted ¶
ResetCompleted will remove all completion records from our store and reset the pointer
func (*JobStore) ResetFailed ¶
ResetFailed will remove all fail records from our store and reset the pointer
func (*JobStore) RetireAsyncJob ¶
RetireAsyncJob removes a completed asynchronous job
func (*JobStore) RetireSequentialJob ¶
RetireSequentialJob removes a completed synchronous job
func (*JobStore) UnclaimAsync ¶
UnclaimAsync will find all claimed async jobs and unclaim them again
func (*JobStore) UnclaimSequential ¶
UnclaimSequential will find all claimed sequential jobs and unclaim them again
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
A Processor is responsible for the main dispatch and bulking of jobs to ensure they're handled in the most optimal fashion.
func NewProcessor ¶
NewProcessor will return a new Processor with the specified number of jobs. Note that "njobs" only refers to the number of *background jobs*, the majority of operations will run sequentially
func (*Processor) Begin ¶
func (j *Processor) Begin()
Begin will start the main job processor in parallel
type PullRepoJobHandler ¶
type PullRepoJobHandler struct {
// contains filtered or unexported fields
}
PullRepoJobHandler is responsible for cloning an existing repository
func NewPullRepoJobHandler ¶
func NewPullRepoJobHandler(j *JobEntry) (*PullRepoJobHandler, error)
NewPullRepoJobHandler will create a job handler for the input job and ensure it validates
func (*PullRepoJobHandler) Describe ¶
func (j *PullRepoJobHandler) Describe() string
Describe returns a human readable description for this job
type RemoveSourceJobHandler ¶
type RemoveSourceJobHandler struct {
// contains filtered or unexported fields
}
RemoveSourceJobHandler is responsible for removing packages by identifiers
func NewRemoveSourceJobHandler ¶
func NewRemoveSourceJobHandler(j *JobEntry) (*RemoveSourceJobHandler, error)
NewRemoveSourceJobHandler will create a job handler for the input job and ensure it validates
func (*RemoveSourceJobHandler) Describe ¶
func (j *RemoveSourceJobHandler) Describe() string
Describe returns a human readable description for this job
type TransitJobHandler ¶
type TransitJobHandler struct {
// contains filtered or unexported fields
}
TransitJobHandler is responsible for accepting new upload payloads in the repository
func NewTransitJobHandler ¶
func NewTransitJobHandler(j *JobEntry) (*TransitJobHandler, error)
NewTransitJobHandler will create a job handler for the input job and ensure it validates
func (*TransitJobHandler) Describe ¶
func (j *TransitJobHandler) Describe() string
Describe returns a human readable description for this job
type TrimObsoleteJobHandler ¶
type TrimObsoleteJobHandler struct {
// contains filtered or unexported fields
}
TrimObsoleteJobHandler is responsible for indexing repositories and should only ever be used in sequential queues.
func NewTrimObsoleteJobHandler ¶
func NewTrimObsoleteJobHandler(j *JobEntry) (*TrimObsoleteJobHandler, error)
NewTrimObsoleteJobHandler will create a job handler for the input job and ensure it validates
func (*TrimObsoleteJobHandler) Describe ¶
func (j *TrimObsoleteJobHandler) Describe() string
Describe returns a human readable description for this job
type TrimPackagesJobHandler ¶
type TrimPackagesJobHandler struct {
// contains filtered or unexported fields
}
TrimPackagesJobHandler is responsible for removing packages by identifiers
func NewTrimPackagesJobHandler ¶
func NewTrimPackagesJobHandler(j *JobEntry) (*TrimPackagesJobHandler, error)
NewTrimPackagesJobHandler will create a job handler for the input job and ensure it validates
func (*TrimPackagesJobHandler) Describe ¶
func (j *TrimPackagesJobHandler) Describe() string
Describe returns a human readable description for this job
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
A Worker is used to execute some portion of the incoming workload, and will keep polling for the correct job type to process
func NewWorkerAsync ¶
NewWorkerAsync will return an asynchronous processing worker which will only pull from the store's async job queue
func NewWorkerSequential ¶
NewWorkerSequential will return a sequential worker operating on the main sequential job loop