Documentation
¶
Overview ¶
* Package state provides an interface for a beanstalkd job, state for * job, tube and a connected client. and job state machine with the * states and transitions as defined in the protocol. * https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt * * Implementations include an in-memory implementation of the interfaces
Index ¶
- Variables
- func GetStatistics(nowSecs int64, j Job) map[string]interface{}
- type BuriedJobs
- type ClientID
- type ClientResvEntry
- type DelayedJobs
- type IndexEntry
- type JSM
- type JSMSnapshot
- type Job
- type JobEntry
- type JobHeap
- type JobID
- type JobIndex
- type JobState
- type PriorityJobs
- type Reservation
- type ReservationStatus
- type ReservedJobs
- type ResultError
- type TubeName
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidIndex no job entry found at the index ErrInvalidIndex = errors.New("provided index is out of range of the heap") // ErrMismatchJobEntry - returned when job entry at index does not match the heap's value ErrMismatchJobEntry = errors.New("job entry at index does not match provided entry") // ErrEntryExists - returned when an entry exists in the existing map/set to prevents from overriding ErrEntryExists = errors.New("entry exists in container") // ErrEntryMissing - returned when an entry is not found in the container ErrEntryMissing = errors.New("entry not found in container") // ErrContainerEmpty - returned when the container such as a list/map/slice etc is empty ErrContainerEmpty = errors.New("the container is empty") // ErrInvalidJobTransition - the current state of the job prevents this transition ErrInvalidJobTransition = errors.New("invalid transition") // ErrInvalidOperation - The state indicates that this op cannot be done ErrInvalidOperation = errors.New("invalid operation due to the current state") ErrUnauthorizedOperation = errors.New("client is not authorized to perform this operation") // ErrCancel - indicates the request is cancelled ErrCancel = errors.New("cancelled") // ErrNoReservation - indicates that a request for a reservation could not be completed ErrNoReservationFound = errors.New("no reservation could be found") // ErrInvalidResvTimeout - indicates the provided reservation timeout is invalid ErrInvalidResvTimeout = errors.New("the provided reservation timeout is invalid") // ErrClientIsWaitingForReservation - Indicates the client is waiting for a reservation ErrClientIsWaitingForReservation = errors.New("the request client cannot request for another reservation") )
Functions ¶
func GetStatistics ¶
Types ¶
type BuriedJobs ¶
type BuriedJobs JobHeap
BuriedJobs is a JobHeap, with jobs ordered by its BuriedAt field. Lower (earlier) BuriedAt take a higher precedence If two jobs have the same BuriedAt value, the lower job id gets precedence
type ClientResvEntry ¶
type ClientResvEntry struct { // ID of the client making reservations CliID ClientID // Tubes are the tube names watched by this client // for this reservation WatchedTubes []TubeName // reservation Deadline timestamp ResvDeadlineAt int64 // Indicates if this entry is waiting for a reservation IsWaitingForResv bool // clock at which the client needs some processing // If client IsWaitingForResv is set to true // - if there is a job already reserved at the lowest job's // deadline is within a second of now, then send a DEADLINE_SOON // and un-reserve the client // - If there are no jobs reserved at the current time is past // the client's reservation ResvDeadlineAt, then un-reserve the client // Check to see if any reservations need to be cleaned TickAt int64 // the request ID of the reservation ReqID string // Index of the client in the client Heap HeapIndex int }
Represents a client reservation as requested by the client
type DelayedJobs ¶
type DelayedJobs JobHeap
DelayedJobs is a JobHeap, with jobs ordered by its ReadyAt field. Lower (earlier) ReadyAt takes a higher precedence.
func NewDelayedJobs ¶
func NewDelayedJobs() DelayedJobs
type IndexEntry ¶
type IndexEntry struct {
// contains filtered or unexported fields
}
type JSM ¶
type JSM interface { // Put makes a new job. It initializes the job with // a unique identifier, sets state to Ready or Delayed based // on the delay parameter. Put(nowSeconds int64, priority uint32, delay int64, ttr int, bodySize int, body []byte, tubeName TubeName) (JobID, error) // Delete, removes a job specified by the id by a specific client Delete(jobID JobID, clientID ClientID) error // PeekDelayedJob returns the job in the Delay state in order of // priority for this tube. A job with and earlier (lower) delay // takes higher precedence. PeekDelayedJob(tubeName TubeName) (Job, error) // PeekReadyJob returns the job in the Ready state in order of // priority for this tube. A job with a lower priority value // takes higher precedence. PeekReadyJob(tubeName TubeName) (Job, error) // PeekReadyJob returns the job in the Buried state in order of // priority for this tube. A job which was buried first takes higher precedence. PeekBuriedJob(tubeName TubeName) (Job, error) // GetJob returns the job by its id GetJob(id JobID) (Job, error) // Release transitions this reserved job to a Ready state Release(jobID JobID, clientID ClientID) error // ReleaseWith transitions this reserved job to a Ready or Delayed state // with a modified priority and delay ReleaseWith(nowSeconds int64, jobID JobID, clientID ClientID, pri uint32, delay int64) error // Extend a reserved job's reservation TTL by its TTR (time-to-run) Touch(nowSeconds int64, jobID JobID, clientID ClientID) error // Bury this job (from a reserved state) Bury(nowSeconds int64, jobID JobID, priority uint32, clientID ClientID) error // Kick this job from buried state to a ready state Kick(jobID JobID) error // Kick atmost n jobs from the specified tube to ready state KickN(name TubeName, n int) (int, error) // Retrieve the statistics of this job // The stats-job data is a YAML byte slice representing a single dictionary of string // keys to scalar values. It contains these keys: // - "id" is the job id // - "tube" is the name of the tube that contains this job // - "state" is "ready" or "delayed" or "reserved" or "buried" // - "pri" is the priority value set by the put, release, or bury commands. // - "age" is the time in seconds since the put command that created this job. // - "delay" is the integer number of seconds to wait before putting this job in // the ready queue. // - "ttr" -- time to run -- is the integer number of seconds a worker is // allowed to run this job. // - "time-left" is the number of seconds left until the server puts this job // into the ready queue. This number is only meaningful if the job is // reserved or delayed. If the job is reserved and this amount of time // elapses before its state changes, it is considered to have timed out. // - "file" this will be 0. // - "reserves" is the number of times this job has been reserved. // - "timeouts" is the number of times this job has timed out during a // reservation. // - "releases" is the number of times a client has released this job from a // reservation. // - "buries" is the number of times this job has been buried. // - "kicks" is the number of times this job has been kicked. GetStatsJobAsYaml(nowSeconds int64, id JobID) ([]byte, error) // Retrieve tube statistics // // The stats-job data is a YAML byte slice representing a single dictionary of string // keys to scalar values. It contains these keys: // - "name" is the tube's name. // - "current-jobs-ready" is the number of jobs in the ready queue in this tube. // - "current-jobs-reserved" is the number of jobs reserved by all clients in this tube. // - "current-jobs-delayed" is the number of delayed jobs in this tube. // - "current-jobs-buried" is the number of buried jobs in this tube. // - "current-waiting" is the number of open connections that have issued a // reserve command while watching this tube but not yet received a response. // // The following are not implemented but have placeholders for backward-compat: // - "current-jobs-urgent" always zero // - "total-jobs" is always zero // - "current-using" is always zero // - "current-watching" is the number of open connections that are currently // watching this tube. // - "pause" is the number of seconds the tube has been paused for. // - "cmd-delete" is the cumulative number of delete commands for this tube // - "cmd-pause-tube" is the cumulative number of pause-tube commands for this tube. // - "pause-time-left" is the number of seconds until the tube is un-paused. GetStatsTubeAsYaml(nowSeconds int64, tubeName TubeName) ([]byte, error) // Retrieve overall statistics GetStatsAsYaml(nowSeconds int64) ([]byte, error) // Returns an interface that allows a caller to snapshot the current // state of the JSM. Callers of the interface should not be done across // go-routines. Snapshot() (JSMSnapshot, error) // AppendReservation Appends a new Reservation Request for a client, and the specified set of tubes // if the timeoutSecs is zero, then an infinite timeout is assumed. // // A pointer to a Reservation struct is returned which encapsulates a result if a reservation // was handled or not AppendReservation(clientId ClientID, reqID string, watchedTubes []TubeName, nowSecs, deadlineAt int64) (*Reservation, error) // Tick runs a step of the job state machine with the current time. // // This call returns all the allocated reservations in this tick call Tick(nowSecs int64) ([]*Reservation, error) // GetTubes returns the tubeNames of all current tubes GetTubes() ([]TubeName, error) // CheckClientState queries the job state machine whether the provided list of clientIds are waiting for reservations. // // The response returns the ClientIDs (i) which are waiting for reservations, (ii) those which are not waiting and (iii) // those which are missing or an error. CheckClientState(clientIDs []ClientID) ([]ClientID, []ClientID, []ClientID, error) // Close or stop this jobstatemachine Stop() error }
JSM provides methods for the beanstalkd job state machine. put with delay release with delay
----------------> [DELAYED] <------------. | | touch (extend ttr) | timeout/ttr | .----. | | | | put v reserve | | v delete -----------------> [READY] ------------> [RESERVED] --------> *poof* ^ ^ | | | | ^\ release | | | | \ `-------------' | | | \ | | | \ timeout/ttr , | | `-------------- | | | | kick | | | | bury | [BURIED] <-----------------' | | delete `--------> *poof*
type JSMSnapshot ¶
type JSMSnapshot interface { // SnapshotJobs returns a read-only job channel. This allows a caller // to iterate through the jobs sent on the channel, when all the jobs // in the state machine are returned, this method closes the channel, // signaling the end of this snapshot. // // SnapshotJobs is used to support log compaction. This call should // an be used to save a point-in-time snapshot of the FSM. // // SnapshotJobs should not be called to the JSM across go-routines, // this is the default behavior (unless an implementation forces to override this) // A caller is recommended to clone this job SnapshotJobs() (<-chan Job, error) // SnapshotClients returns a read-only job channel. This allows a caller // to iterate through the clientResvEntries sent on the channel, when // all the entries in the state machine are returned, this method closes // the channel, signaling the end of this snapshot. // // SnapshotClients is used to support log compaction. This call should // an be used to save a point-in-time snapshot of the FSM. // // SnapshotClients should not be called to the JSM across go-routines, // this is the default behavior (unless an implementation forces to override this) // A caller is recommended to clone this job SnapshotClients() (<-chan *ClientResvEntry, error) // RestoreJobs takes jobsCh, a write-only job channel, that allow a caller // to send jobs to be added to the job state machine (JSM). // // Once RestoreJobs and RestoreClients are complete. call FinalizeRestore // which replaces the current state of JSM with the jobs provided in the // channel. // // RestoreJobs takes an additional context which can be used to signal // a cancellation. In this case, the method discards the jobs that were // provided on the channel, after the cancel is called // // Note: It is the responsibility of the caller to close the channels // and drain the jobsCh RestoreJobs(ctx context.Context, jobsCh <-chan Job) error // RestoreClients takes entriesCh, a write-only job channel, that allow // a caller to send clientResvEntry structs to be added to the job state // machine (JSM). // // Once RestoreJobs and RestoreClients are complete. call FinalizeRestore // which replaces the current state of JSM with the jobs provided in the // channel. // // RestoreClients takes an additional context which can be used to signal // a cancellation. In this case, the method discards the clientResvEntries // that were provided on the channel, after the cancel is called // // Note: It is the responsibility of the caller to close the channels // and drain the jobsCh RestoreClients(ctx context.Context, nClients int, entriesCh <-chan *ClientResvEntry) error // Finalize Restore overwrites the state of the job-state-machine // with the current state of the snapshot // // Once RestoreJobs and RestoreClients are complete. call FinalizeRestore // which replaces the current state of JSM with the jobs provided in the // channel. FinalizeRestore() }
JSMSnapshot provides methods allowing a caller to read & restore jobs out of the job state machine.
type Job ¶
type Job interface { // ID is a unique identifier integer for this job (generated by the server) ID() JobID // Priority is an integer < 2**32. Jobs with smaller priority values will be // scheduled before jobs with larger priorities. The most urgent priority is 0; // the least urgent priority is 4,294,967,295. Priority() uint32 // UpdatePriority to a new value. Return back the newly set value UpdatePriority(newPriority uint32) uint32 // Delay is an integer number of seconds to wait before putting the job in // the ready queue. The job will be in the "delayed" state during this time. // Maximum delay is 2**32-1. Delay() int64 // Update Delay to a new value. Return back the newly set value UpdateDelay(newDelay int64) int64 // TTR/time to run -- is an integer number of seconds to allow a worker // to run this job. This time is counted from the moment a worker reserves // this job. If the worker does not delete, release, or bury the job within // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. TTR() int // BodySize is an integer indicating the size of the job body, not including the // trailing "\r\n". This value must be less than max-job-size (default: 2**16) BodySize() int // Body is the job body -- a sequence of bytes of length BodySize Body() []byte // TubeName the name of the tube associated with this job TubeName() TubeName // CreatedAt - Indicates the time, when this job is created CreatedAt() int64 // ReadyAt - Indicates the time when the job is ready ReadyAt() int64 // Reset the ReadyAt time taking the current time in reference // Return back the new readyAt time UpdateReadyAt(nowSeconds int64) (int64, error) // Retrieve the current job state State() JobState // Update the job state UpdateState(newState JobState) // Return the time at which the reservation expires ExpiresAt() int64 // Reset the reservation time taking the current in reference // Return back the new reservation time UpdateReservation(nowSeconds int64) (int64, error) // ReservedBy returns the name of the client which has // reserved this job. Empty string if this job is not reserved ReservedBy() ClientID // Update the reservedBy client UpdateReservedBy(clientID ClientID) // Returns the time this specific job was buried BuriedAt() int64 // Reset the buriedAt value to zero ResetBuriedAt() // Update the buriedAt value to the current clock // Return back the new BuriedAt time UpdateBuriedAt(nowSeconds int64) int64 ReserveCount() uint32 IncReserveCount() TimeoutCount() uint32 IncTimeoutCount() ReleaseCount() uint32 IncReleaseCount() BuryCount() uint32 IncBuryCount() KickCount() uint32 IncKickCount() }
type JobEntry ¶
type JobEntry struct { Job // contains filtered or unexported fields }
JobEntry is an entry in the JobHeap
type JobHeap ¶
type JobHeap interface { // Enqueue appends an entry to the job heap in priority order Enqueue(job Job) *JobEntry // Dequeue returns a from the heap in priority order Dequeue() Job // RemoveAt removes a specific job entry. RemoveAt(jobEntry *JobEntry) (*JobEntry, error) // Len returns the heap length Len() int // Peek returns the top element of the heap without dequeuing it Peek() *JobEntry // Return the number of jobs found in the specific tube JobCountByTube(tubename TubeName) uint64 }
JobHeap is a binary heap of jobs
type JobIndex ¶
type JobIndex struct {
// contains filtered or unexported fields
}
An index of indexEntry indexed by a job ID
func NewJobIndex ¶
func NewJobIndex() *JobIndex
type PriorityJobs ¶
type PriorityJobs JobHeap
PriorityJobs is a JobHeap, with jobs ordered by its Priority. Lower priority values takes a higher precedence.
func NewPriorityJobs ¶
func NewPriorityJobs() PriorityJobs
type Reservation ¶
type Reservation struct { RequestId string ClientId ClientID JobId JobID Status ReservationStatus BodySize int Body []byte Error error }
func (Reservation) String ¶
func (r Reservation) String() string
type ReservationStatus ¶
type ReservationStatus int
go:generate stringer -type=ReservationStatus --output state_string.go
const ( Unknown ReservationStatus = iota Queued DeadlineSoon Matched Timeout Error )
type ReservedJobs ¶
type ReservedJobs JobHeap
ReservedJobs is a JobHeap, with jobs ordered by its ExpiresAt field. Lower (earlier) ExpiresAt take a higher precedence
func NewBuriedJobs ¶
func NewBuriedJobs() ReservedJobs
func NewReservedJobs ¶
func NewReservedJobs() ReservedJobs
type ResultError ¶
type ResultError struct { // ID of the request RequestID string // Identifier for the error code ErrorCode int32 // Error Err error }
func (*ResultError) Error ¶
func (r *ResultError) Error() string
func (*ResultError) Unwrap ¶
func (r *ResultError) Unwrap() error