jobs

package
v0.0.0-...-eb887e7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// BulkAdd is a sequential job which will attempt to add all of the packages
	BulkAdd JobType = "BulkAdd"

	// CopySource is a sequential job to copy from one repo to another
	CopySource = "CopySource"

	// CloneRepo is a sequential job which will attempt to clone a repo
	CloneRepo = "CloneRepo"

	// CreateRepo is a sequential job which will attempt to create a new repo
	CreateRepo = "CreateRepo"

	// DeleteRepo is a sequential job which will attempt to delete a repository
	DeleteRepo = "DeleteRepo"

	// Delta is a parallel job which will attempt the construction of deltas for
	// a given package name + repo
	Delta = "Delta"

	// DeltaIndex is created in response to transit manifest events, and will
	// cause the repository to be reindexed after each delta job continues
	DeltaIndex = "Delta+Index"

	// DeltaRepo is a sequential job which creates Delta jobs for every package in
	// a repo
	DeltaRepo = "DeltaRepo"

	// IndexRepo is a sequential job that requests the repository be re-indexed
	IndexRepo = "IndexRepo"

	// PullRepo is a sequential job that will attempt to pull a repo
	PullRepo = "PullRepo"

	// RemoveSource is a sequential job that will attempt removal of packages
	RemoveSource = "RemoveSource"

	// TransitProcess is a sequential job that will process the incoming uploads
	// directory, dealing with each .tram upload
	TransitProcess = "TransitProcess"

	// TrimObsolete is a sequential job to permanently remove obsolete packages
	// from a repo
	TrimObsolete = "TrimObsolete"

	// TrimPackages is a sequential job to trim fat from a repository
	TrimPackages = "TrimPackages"
)
View Source
const MaxJitter int64 = 512

MaxJitter sets the upper limit on the random jitter used for retry times

View Source
const (
	// MaxJobsStored is the maximum amount of jobs we can store before rotating
	MaxJobsStored = 100
)
View Source
const MinWait = time.Second * 2

MinWait is the minimum amount of time between retries for a worker

Variables

View Source
var (
	// BucketAsyncJobs holds all asynchronous jobs
	BucketAsyncJobs = []byte("Async")

	// BucketSequentialJobs holds all sequential jobs
	BucketSequentialJobs = []byte("Sync")

	// BucketSuccessJobs contains jobs that have completed successfully
	BucketSuccessJobs = []byte("CompletedSuccess")

	// BucketFailJobs contains jobs that completed with failure
	BucketFailJobs = []byte("CompletedFailure")

	// ErrEmptyQueue is returned to indicate a job is not available yet
	ErrEmptyQueue = errors.New("Queue is empty")

	// ErrBreakLoop is used only to break the foreach internally.
	ErrBreakLoop = errors.New("loop breaker")

	// BucketRecord is used as a subbucket for records
	BucketRecord = []byte("Record")

	// IndexRecordKey is used in the job store to mark the next write location
	IndexRecordKey = []byte("IndexRecord00")
)

Functions

This section is empty.

Types

type BulkAddJobHandler

type BulkAddJobHandler struct {
	// contains filtered or unexported fields
}

BulkAddJobHandler is responsible for indexing repositories and should only ever be used in sequential queues.

func NewBulkAddJobHandler

func NewBulkAddJobHandler(j *JobEntry) (*BulkAddJobHandler, error)

NewBulkAddJobHandler will create a job handler for the input job and ensure it validates

func (*BulkAddJobHandler) Describe

func (j *BulkAddJobHandler) Describe() string

Describe returns a human readable description for this job

func (*BulkAddJobHandler) Execute

func (j *BulkAddJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute will attempt the mass-import of packages passed to the job

type CloneRepoJobHandler

type CloneRepoJobHandler struct {
	// contains filtered or unexported fields
}

CloneRepoJobHandler is responsible for cloning an existing repository

func NewCloneRepoJobHandler

func NewCloneRepoJobHandler(j *JobEntry) (*CloneRepoJobHandler, error)

NewCloneRepoJobHandler will create a job handler for the input job and ensure it validates

func (*CloneRepoJobHandler) Describe

func (j *CloneRepoJobHandler) Describe() string

Describe returns a human readable description for this job

func (*CloneRepoJobHandler) Execute

func (j *CloneRepoJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute attempt to clone the repoID to newClone, optionally at full depth

type CopySourceJobHandler

type CopySourceJobHandler struct {
	// contains filtered or unexported fields
}

CopySourceJobHandler is responsible for removing packages by identifiers

func NewCopySourceJobHandler

func NewCopySourceJobHandler(j *JobEntry) (*CopySourceJobHandler, error)

NewCopySourceJobHandler will create a job handler for the input job and ensure it validates

func (*CopySourceJobHandler) Describe

func (j *CopySourceJobHandler) Describe() string

Describe returns a human readable description for this job

func (*CopySourceJobHandler) Execute

func (j *CopySourceJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute will copy the source&rel match from the repo to the target

type CreateRepoJobHandler

type CreateRepoJobHandler struct {
	// contains filtered or unexported fields
}

CreateRepoJobHandler is responsible for creating new repositories and should only ever be used in sequential queues.

func NewCreateRepoJobHandler

func NewCreateRepoJobHandler(j *JobEntry) (*CreateRepoJobHandler, error)

NewCreateRepoJobHandler will create a job handler for the input job and ensure it validates

func (*CreateRepoJobHandler) Describe

func (j *CreateRepoJobHandler) Describe() string

Describe returns a human readable description for this job

func (*CreateRepoJobHandler) Execute

func (j *CreateRepoJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute will construct a new repository if possible

type DeleteRepoJobHandler

type DeleteRepoJobHandler struct {
	// contains filtered or unexported fields
}

DeleteRepoJobHandler is responsible for creating new repositories and should only ever be used in sequential queues.

func NewDeleteRepoJobHandler

func NewDeleteRepoJobHandler(j *JobEntry) (*DeleteRepoJobHandler, error)

NewDeleteRepoJobHandler will create a job handler for the input job and ensure it validates

func (*DeleteRepoJobHandler) Describe

func (j *DeleteRepoJobHandler) Describe() string

Describe returns a human readable description for this job

func (*DeleteRepoJobHandler) Execute

func (j *DeleteRepoJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute will delete an existing repository

type DeltaJobHandler

type DeltaJobHandler struct {
	// contains filtered or unexported fields
}

DeltaJobHandler is responsible for indexing repositories and should only ever be used in async queues. Deltas may take some time to produce and shouldn't be allowed to block the sequential processing queue.

func NewDeltaJobHandler

func NewDeltaJobHandler(j *JobEntry, indexRepo bool) (*DeltaJobHandler, error)

NewDeltaJobHandler will create a job handler for the input job and ensure it validates

func (*DeltaJobHandler) Describe

func (j *DeltaJobHandler) Describe() string

Describe returns a human readable description for this job

func (*DeltaJobHandler) Execute

func (j *DeltaJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute will delta the target package within the target repository.

type DeltaRepoJobHandler

type DeltaRepoJobHandler struct {
	// contains filtered or unexported fields
}

DeltaRepoJobHandler is responsible for delta'ing repositories and should only ever be used in sequential queues.

func NewDeltaRepoJobHandler

func NewDeltaRepoJobHandler(j *JobEntry) (*DeltaRepoJobHandler, error)

NewDeltaRepoJobHandler will create a job handler for the input job and ensure it validates

func (*DeltaRepoJobHandler) Describe

func (j *DeltaRepoJobHandler) Describe() string

Describe returns a human readable description for this job

func (*DeltaRepoJobHandler) Execute

func (j *DeltaRepoJobHandler) Execute(jproc *Processor, manager *core.Manager) error

Execute will delta the given repository if possible Note that it will NOT index the repository, this is a separate step as it takes a significant amount of time to fully produce all initial deltas.

This operation is ideally only used after the first import of a repository, after then deltas will be produced on the fly.

type IndexRecord

type IndexRecord struct {
	Index uint64
}

IndexRecord is just a simple helper to store the index record..

type IndexRepoJobHandler

type IndexRepoJobHandler struct {
	// contains filtered or unexported fields
}

IndexRepoJobHandler is responsible for indexing repositories and should only ever be used in sequential queues.

func NewIndexRepoJobHandler

func NewIndexRepoJobHandler(j *JobEntry) (*IndexRepoJobHandler, error)

NewIndexRepoJobHandler will create a job handler for the input job and ensure it validates

func (*IndexRepoJobHandler) Describe

func (j *IndexRepoJobHandler) Describe() string

Describe returns a human readable description for this job

func (*IndexRepoJobHandler) Execute

func (j *IndexRepoJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute will index the given repository if possible

type JobEntry

type JobEntry struct {
	Type    JobType
	Claimed bool
	Params  []string
	Timing  libferry.TimingInformation // Store all timing information
	// contains filtered or unexported fields
}

JobEntry is an entry in the JobQueue

func Deserialize

func Deserialize(serial []byte) (*JobEntry, error)

Deserialize use Gob decoding to convert a byte slice to a JobEntry

func NewBulkAddJob

func NewBulkAddJob(id string, pkgs []string) *JobEntry

NewBulkAddJob will return a job suitable for adding to the job processor

func NewCloneRepoJob

func NewCloneRepoJob(repoID, newClone string, cloneAll bool) *JobEntry

NewCloneRepoJob will return a job suitable for adding to the job processor

func NewCopySourceJob

func NewCopySourceJob(repoID, target, source string, release int) *JobEntry

NewCopySourceJob will return a job suitable for adding to the job processor

func NewCreateRepoJob

func NewCreateRepoJob(id string) *JobEntry

NewCreateRepoJob will return a job suitable for adding to the job processor

func NewDeleteRepoJob

func NewDeleteRepoJob(id string) *JobEntry

NewDeleteRepoJob will return a job suitable for adding to the job processor

func NewDeltaIndexJob

func NewDeltaIndexJob(repoID, packageID string) *JobEntry

NewDeltaIndexJob will return a new job for creating delta packages as well as scheduling an index operation when complete.

func NewDeltaJob

func NewDeltaJob(repoID, packageID string) *JobEntry

NewDeltaJob will return a job suitable for adding to the job processor

func NewDeltaRepoJob

func NewDeltaRepoJob(id string) *JobEntry

NewDeltaRepoJob will return a job suitable for adding to the job processor

func NewIndexRepoJob

func NewIndexRepoJob(id string) *JobEntry

NewIndexRepoJob will return a job suitable for adding to the job processor

func NewPullRepoJob

func NewPullRepoJob(sourceID, targetID string) *JobEntry

NewPullRepoJob will return a job suitable for adding to the job processor

func NewRemoveSourceJob

func NewRemoveSourceJob(repoID, source string, release int) *JobEntry

NewRemoveSourceJob will return a job suitable for adding to the job processor

func NewTransitJob

func NewTransitJob(path string) *JobEntry

NewTransitJob will return a job suitable for adding to the job processor

func NewTrimObsoleteJob

func NewTrimObsoleteJob(id string) *JobEntry

NewTrimObsoleteJob will return a job suitable for adding to the job processor

func NewTrimPackagesJob

func NewTrimPackagesJob(repoID string, maxKeep int) *JobEntry

NewTrimPackagesJob will return a job suitable for adding to the job processor

func (*JobEntry) GetID

func (j *JobEntry) GetID() string

GetID gets the true numerical ID for this job entry

func (*JobEntry) Serialize

func (j *JobEntry) Serialize() (result []byte, err error)

Serialize uses Gob encoding to convert a JobEntry to a byte slice

type JobFetcher

type JobFetcher func() (*JobEntry, error)

JobFetcher will be provided by either the Async or Sequential claim functions

type JobHandler

type JobHandler interface {

	// Execute will attempt to execute the given job
	Execute(proc *Processor, m *core.Manager) error

	// Describe will return an appropriate description for the job
	Describe() string
}

A JobHandler is created for each JobEntry, to provide specialised handling of the job type

func NewJobHandler

func NewJobHandler(j *JobEntry) (JobHandler, error)

NewJobHandler will return a handler that is loaded only during the execution of a previously serialised job

type JobReaper

type JobReaper func(j *JobEntry) error

JobReaper will be provided by either the Async or Sequential retire functions

type JobStore

type JobStore struct {
	// contains filtered or unexported fields
}

JobStore handles the storage and manipulation of incomplete jobs

func NewStore

func NewStore(path string) (*JobStore, error)

NewStore creates a fully initialized JobStore and sets up Bolt Buckets as needed

func (*JobStore) ActiveJobs

func (s *JobStore) ActiveJobs() ([]*libferry.Job, error)

ActiveJobs will attempt to return a list of active jobs within the scheduler suitable for consumption by the CLI client

func (*JobStore) ClaimAsyncJob

func (s *JobStore) ClaimAsyncJob() (*JobEntry, error)

ClaimAsyncJob gets the first available asynchronous job, if one exists

func (*JobStore) ClaimSequentialJob

func (s *JobStore) ClaimSequentialJob() (*JobEntry, error)

ClaimSequentialJob gets the first available synchronous job, if one exists

func (*JobStore) Close

func (s *JobStore) Close()

Close will clean up our private job database

func (*JobStore) CompletedJobs

func (s *JobStore) CompletedJobs() ([]*libferry.Job, error)

CompletedJobs will return all successfully completed jobs still stored

func (*JobStore) FailedJobs

func (s *JobStore) FailedJobs() ([]*libferry.Job, error)

FailedJobs will return all failed jobs that are still stored

func (*JobStore) PushAsyncJob

func (s *JobStore) PushAsyncJob(j *JobEntry) error

PushAsyncJob will enqueue a new asynchronous job

func (*JobStore) PushSequentialJob

func (s *JobStore) PushSequentialJob(j *JobEntry) error

PushSequentialJob will enqueue a new sequential job

func (*JobStore) ResetCompleted

func (s *JobStore) ResetCompleted() error

ResetCompleted will remove all completion records from our store and reset the pointer

func (*JobStore) ResetFailed

func (s *JobStore) ResetFailed() error

ResetFailed will remove all fail records from our store and reset the pointer

func (*JobStore) RetireAsyncJob

func (s *JobStore) RetireAsyncJob(j *JobEntry) error

RetireAsyncJob removes a completed asynchronous job

func (*JobStore) RetireSequentialJob

func (s *JobStore) RetireSequentialJob(j *JobEntry) error

RetireSequentialJob removes a completed synchronous job

func (*JobStore) UnclaimAsync

func (s *JobStore) UnclaimAsync() error

UnclaimAsync will find all claimed async jobs and unclaim them again

func (*JobStore) UnclaimSequential

func (s *JobStore) UnclaimSequential() error

UnclaimSequential will find all claimed sequential jobs and unclaim them again

type JobType

type JobType string

JobType is a numerical representation of a kind of job

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

A Processor is responsible for the main dispatch and bulking of jobs to ensure they're handled in the most optimal fashion.

func NewProcessor

func NewProcessor(m *core.Manager, store *JobStore, njobs int) *Processor

NewProcessor will return a new Processor with the specified number of jobs. Note that "njobs" only refers to the number of *background jobs*, the majority of operations will run sequentially

func (*Processor) Begin

func (j *Processor) Begin()

Begin will start the main job processor in parallel

func (*Processor) Close

func (j *Processor) Close()

Close an existing Processor, waiting for all jobs to complete

func (*Processor) PushJob

func (j *Processor) PushJob(job *JobEntry)

PushJob will automatically determine which queue to push a job to and place it there for immediate execution

type PullRepoJobHandler

type PullRepoJobHandler struct {
	// contains filtered or unexported fields
}

PullRepoJobHandler is responsible for cloning an existing repository

func NewPullRepoJobHandler

func NewPullRepoJobHandler(j *JobEntry) (*PullRepoJobHandler, error)

NewPullRepoJobHandler will create a job handler for the input job and ensure it validates

func (*PullRepoJobHandler) Describe

func (j *PullRepoJobHandler) Describe() string

Describe returns a human readable description for this job

func (*PullRepoJobHandler) Execute

func (j *PullRepoJobHandler) Execute(jproc *Processor, manager *core.Manager) error

Execute will attempt to pull the repos

type RemoveSourceJobHandler

type RemoveSourceJobHandler struct {
	// contains filtered or unexported fields
}

RemoveSourceJobHandler is responsible for removing packages by identifiers

func NewRemoveSourceJobHandler

func NewRemoveSourceJobHandler(j *JobEntry) (*RemoveSourceJobHandler, error)

NewRemoveSourceJobHandler will create a job handler for the input job and ensure it validates

func (*RemoveSourceJobHandler) Describe

func (j *RemoveSourceJobHandler) Describe() string

Describe returns a human readable description for this job

func (*RemoveSourceJobHandler) Execute

func (j *RemoveSourceJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute will remove the source&rel match from the repo

type TransitJobHandler

type TransitJobHandler struct {
	// contains filtered or unexported fields
}

TransitJobHandler is responsible for accepting new upload payloads in the repository

func NewTransitJobHandler

func NewTransitJobHandler(j *JobEntry) (*TransitJobHandler, error)

NewTransitJobHandler will create a job handler for the input job and ensure it validates

func (*TransitJobHandler) Describe

func (j *TransitJobHandler) Describe() string

Describe returns a human readable description for this job

func (*TransitJobHandler) Execute

func (j *TransitJobHandler) Execute(jproc *Processor, manager *core.Manager) error

Execute will process incoming .tram files for potential repo inclusion

type TrimObsoleteJobHandler

type TrimObsoleteJobHandler struct {
	// contains filtered or unexported fields
}

TrimObsoleteJobHandler is responsible for indexing repositories and should only ever be used in sequential queues.

func NewTrimObsoleteJobHandler

func NewTrimObsoleteJobHandler(j *JobEntry) (*TrimObsoleteJobHandler, error)

NewTrimObsoleteJobHandler will create a job handler for the input job and ensure it validates

func (*TrimObsoleteJobHandler) Describe

func (j *TrimObsoleteJobHandler) Describe() string

Describe returns a human readable description for this job

func (*TrimObsoleteJobHandler) Execute

func (j *TrimObsoleteJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute will try to remove any excessive packages marked as Obsolete

type TrimPackagesJobHandler

type TrimPackagesJobHandler struct {
	// contains filtered or unexported fields
}

TrimPackagesJobHandler is responsible for removing packages by identifiers

func NewTrimPackagesJobHandler

func NewTrimPackagesJobHandler(j *JobEntry) (*TrimPackagesJobHandler, error)

NewTrimPackagesJobHandler will create a job handler for the input job and ensure it validates

func (*TrimPackagesJobHandler) Describe

func (j *TrimPackagesJobHandler) Describe() string

Describe returns a human readable description for this job

func (*TrimPackagesJobHandler) Execute

func (j *TrimPackagesJobHandler) Execute(_ *Processor, manager *core.Manager) error

Execute will attempt removal of excessive packages in the index

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

A Worker is used to execute some portion of the incoming workload, and will keep polling for the correct job type to process

func NewWorkerAsync

func NewWorkerAsync(processor *Processor) *Worker

NewWorkerAsync will return an asynchronous processing worker which will only pull from the store's async job queue

func NewWorkerSequential

func NewWorkerSequential(processor *Processor) *Worker

NewWorkerSequential will return a sequential worker operating on the main sequential job loop

func (*Worker) Start

func (w *Worker) Start()

Start will begin the main execution of this worker, and will continuously poll for new jobs with an increasing increment (with a ceiling limit)

func (*Worker) Stop

func (w *Worker) Stop()

Stop will demand that all new requests are no longer processed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL