services

package
v0.0.0-...-8a90710 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2019 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Mediation layer between the server and database queries.

Logic that's not related to validating request input/turning errors into HTTP responses should go here.

Index

Constants

This section is empty.

Variables

View Source
var DefaultTimeout = 5 * time.Minute

DefaultTimeout is the default amount of time a JobProcessor should wait for a job to complete, once it's been sent to the downstream server.

View Source
var UnavailableSleepFactor = 500

UnavailableSleepFactor determines how long the application should sleep between 503 Service Unavailable downstream responses.

Functions

func ArchiveStuckJobs

func ArchiveStuckJobs(olderThan time.Duration) error

ArchiveStuckJobs marks as failed any queued jobs with an updated_at timestamp older than the olderThan value.

func GetSleepDuration

func GetSleepDuration(sleepFactor float64, failedAttempts uint32) time.Duration

GetSleepDuration calculates sleep duration

func HandleStatusCallback

func HandleStatusCallback(id types.PrefixUUID, name string, status models.JobStatus, attempt uint8, retryable bool) error

HandleStatusCallback updates a queued job with the provided status and the attempts remaining. Likely the job will either be inserted into archived_jobs and removed from queued_jobs, or the job will have its attempts counter decremented in queued_jobs.

This can return an error if any of the following happens: the archived_job already exists, the queued job no longer exists by the time you attempt to delete it, the number of attempts for the queued job don't match up with the passed in value (slow)

func WatchStuckJobs

func WatchStuckJobs(interval time.Duration, olderThan time.Duration)

WatchStuckJobs polls the queued_jobs table for stuck jobs (defined as in-progress jobs that haven't been updated in oldDuration time), and marks them as failed.

Types

type JobProcessor

type JobProcessor struct {
	// A Client for making requests to the downstream server.
	Client *downstream.Client

	// Amount of time we should wait for the downstream server to hit the
	// callback before marking the job as failed.
	Timeout time.Duration

	// Multiplier used to determine how long to sleep between failed attempts
	// to acquire a job. The formula for sleeps is 10 * (Factor) ^ (Attempts)
	// ms. Set to 0 to not sleep between attempts.
	SleepFactor float64
}

JobProcessor is the default implementation of the Worker interface.

func NewJobProcessor

func NewJobProcessor(downstreamUrl string, downstreamPassword string) *JobProcessor

NewJobProcessor creates a services.JobProcessor that makes requests to the downstream url.

By default the Client uses Basic Auth with "jobs" as the username, and the configured password as the password.

If the downstream server does not hit the callback, jobs sent to the downstream server are timed out and marked as failed after DefaultTimeout has elapsed.

func (*JobProcessor) DoWork

func (jp *JobProcessor) DoWork(qj *models.QueuedJob) error

DoWork sends the given queued job to the downstream service, then waits for it to complete.

func (JobProcessor) Sleep

func (jp JobProcessor) Sleep(failedAttempts uint32) time.Duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL