Documentation ¶
Overview ¶
Package storage is an abstraction/utility layer over Redis.
Index ¶
- Constants
- Variables
- func AggregationFromMap(m map[string]string) (job.Aggregation, error)
- type Storage
- func (s *Storage) AggregationExists(a *job.Aggregation) (bool, error)
- func (s *Storage) GetAggregation(id string) (*job.Aggregation, error)
- func (s *Storage) GetJob(id string) (job.Job, error)
- func (s *Storage) GetStats(id string) ([]byte, error)
- func (s *Storage) JobExists(j *job.Job) (bool, error)
- func (s *Storage) PopCallback() (job.Job, error)
- func (s *Storage) PopJob(a *job.Aggregation) (job.Job, error)
- func (s *Storage) PopRip() (job.Job, error)
- func (s *Storage) QueueJobForDeletion(id string, delay time.Duration) error
- func (s *Storage) QueuePendingCallback(j *job.Job, delay time.Duration) error
- func (s *Storage) QueuePendingDownload(j *job.Job, delay time.Duration) error
- func (s *Storage) RemoveAggregation(id string) error
- func (s *Storage) RemoveJob(id string) error
- func (s *Storage) RetryCallback(j *job.Job) error
- func (s *Storage) SaveAggregation(a *job.Aggregation) error
- func (s *Storage) SaveJob(j *job.Job) error
- func (s *Storage) SetStats(id, stats string, expiration time.Duration) error
Constants ¶
const ( // Each aggregation has a corresponding Redis Hash named in the form // "<AggrKeyPrefix><aggregation-id>" and containing various information // about the aggregation itself (eg. its limit). AggrKeyPrefix = "aggr:" // Job IDs of an individual aggregation exist in a Redis List named // in the form "<JobsKeyPrefix><aggregation-id>". JobsKeyPrefix = "jobs:" // Each Job has a corresponding Redis Hash named in the form // "<JobKeyPrefix><job-id>" JobKeyPrefix = "job:" // CallbackQueue contains IDs of jobs that are completed // and their callback is to be executed // TODO: this introduces coupling with the notifier. See how we can // separate it CallbackQueue = "CallbackQueue" // RIPQueue contains ids of jobs to be deleted RIPQueue = "JobDeletionQueue" )
Variables ¶
var ( // ErrEmptyQueue is returned by ZPOP when there is no job in the queue ErrEmptyQueue = errors.New("Queue is empty") // ErrRetryLater is returned by ZPOP when there are only future jobs in the queue ErrRetryLater = errors.New("Retry again later") // ErrNotFound is returned by GetJob and GetAggregation when a requested // job, or aggregation respectively is not found in Redis. ErrNotFound = errors.New("Not Found") )
Functions ¶
func AggregationFromMap ¶ added in v0.1.0
func AggregationFromMap(m map[string]string) (job.Aggregation, error)
Types ¶
type Storage ¶
Storage wraps a redis.Client instance.
func New ¶
New returns a new Storage that can communicate with Redis. If Redis is not up an error will be returned.
Callers should set right after set AggrKeyPrefix, JobKeyPrefix and CallbackQueue fields on the returned storage.
func (*Storage) AggregationExists ¶
func (s *Storage) AggregationExists(a *job.Aggregation) (bool, error)
AggregationExists checks if the given aggregation exists in Redis. If a non-nil error is returned, the first returned value should be ignored.
func (*Storage) GetAggregation ¶
func (s *Storage) GetAggregation(id string) (*job.Aggregation, error)
GetAggregation fetches from Redis the aggregation denoted by id. In the case of ErrNotFound, the returned aggregation has valid ID and the default limit.
func (*Storage) GetJob ¶
GetJob fetches the job with the given id from Redis. In the case of ErrNotFound, the returned job has valid ID and can be used further.
func (*Storage) JobExists ¶
JobExists checks if the given job exists in Redis. If a non-nil error is returned, the first returned value should be ignored.
func (*Storage) PopCallback ¶
PopCallback attempts to pop a Job from the callback queue. If it succeeds the job with the popped ID is returned.
func (*Storage) PopJob ¶
PopJob attempts to pop a Job for that aggregation. If it succeeds the job with the popped ID is returned.
func (*Storage) PopRip ¶
PopRip fetches a job from the RIPQueue ( if any ) and reports any errors. If the queue is empty an ErrEmptyQueue error is returned. Notice: Due to the nature of job deletion, the returned job is not guaranteed to be available in Redis.
func (*Storage) QueueJobForDeletion ¶
QueueJobForDeletion pushes the provided job id to RIPQueue and returns any errors The job deletion can be delayed by the specified delay minutes.
func (*Storage) QueuePendingCallback ¶
QueuePendingCallback sets the state of a job to "Pending", saves it and adds it to its aggregation queue If a delay >0 is given, the job is queued with a higher score & actually later in time.
func (*Storage) QueuePendingDownload ¶
QueuePendingDownload sets the state of a job to "Pending", saves it and adds it to its aggregation queue. If a delay >0 is given, the job is queued with a higher score & actually later in time.
TODO: should we check that job already exists in redis? maybe do HSET instead?
func (*Storage) RemoveAggregation ¶
RemoveAggregation deletes the aggregation key from Redis
func (*Storage) RetryCallback ¶
RetryCallback resets a job's callback state and injects it back to the callback queue. If the job is not found, an error is returned.
func (*Storage) SaveAggregation ¶
func (s *Storage) SaveAggregation(a *job.Aggregation) error
SaveAggregation updates/creates the current aggregation in redis.