Documentation
¶
Index ¶
- type ErrJobNotFound
- type ErrQueueAlreadyExists
- type ErrQueueNotFound
- type EventRepository
- type EventStore
- type JobRepository
- type JobResult
- type JobStartInfo
- type QueueRepository
- type RedisEventRepository
- func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId string) (bool, error)
- func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error)
- func (repo *RedisEventRepository) ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)
- func (repo *RedisEventRepository) ReportEvent(message *api.EventMessage) error
- func (repo *RedisEventRepository) ReportEvents(messages []*api.EventMessage) error
- type RedisHealth
- type RedisJobRepository
- func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, error)
- func (repo *RedisJobRepository) AddRetryAttempt(jobId string) error
- func (repo *RedisJobRepository) DeleteJobs(jobs []*api.Job) (map[*api.Job]error, error)
- func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) ([]*api.Job, error)
- func (repo *RedisJobRepository) FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)
- func (repo *RedisJobRepository) GetActiveJobIds(queue string, jobSetId string) ([]string, error)
- func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error)
- func (repo *RedisJobRepository) GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)
- func (repo *RedisJobRepository) GetJobsByIds(ids []string) ([]*JobResult, error)
- func (repo *RedisJobRepository) GetLeasedJobIds(queue string) ([]string, error)
- func (repo *RedisJobRepository) GetNumberOfRetryAttempts(jobId string) (int, error)
- func (repo *RedisJobRepository) GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)
- func (repo *RedisJobRepository) GetQueueJobIds(queueName string) ([]string, error)
- func (repo *RedisJobRepository) GetQueueSizes(queues []*api.Queue) (sizes []int64, err error)
- func (repo *RedisJobRepository) IterateQueueJobs(queueName string, action func(*api.Job)) error
- func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job, error)
- func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error)
- func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)
- func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error)
- func (repo *RedisJobRepository) UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error)
- func (repo *RedisJobRepository) UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error)
- type RedisQueueRepository
- func (r *RedisQueueRepository) CreateQueue(queue queue.Queue) error
- func (r *RedisQueueRepository) DeleteQueue(name string) error
- func (r *RedisQueueRepository) GetAllQueues() ([]queue.Queue, error)
- func (r *RedisQueueRepository) GetQueue(name string) (queue.Queue, error)
- func (r *RedisQueueRepository) UpdateQueue(queue queue.Queue) error
- type RedisSchedulingInfoRepository
- type RedisUsageRepository
- func (r *RedisUsageRepository) GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)
- func (r *RedisUsageRepository) GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)
- func (r *RedisUsageRepository) GetClusterPriority(clusterId string) (map[string]float64, error)
- func (r *RedisUsageRepository) GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)
- func (r *RedisUsageRepository) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error
- func (r *RedisUsageRepository) UpdateClusterLeased(report *api.ClusterLeasedReport) error
- type RunInfo
- type SchedulingInfoRepository
- type SubmitJobResult
- type UpdateJobResult
- type Usage
- type UsageRepository
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrJobNotFound ¶ added in v0.2.15
func (*ErrJobNotFound) Error ¶ added in v0.2.15
func (err *ErrJobNotFound) Error() string
type ErrQueueAlreadyExists ¶ added in v0.2.3
type ErrQueueAlreadyExists struct {
QueueName string
}
func (*ErrQueueAlreadyExists) Error ¶ added in v0.2.15
func (err *ErrQueueAlreadyExists) Error() string
type ErrQueueNotFound ¶ added in v0.2.3
type ErrQueueNotFound struct {
QueueName string
}
func (*ErrQueueNotFound) Error ¶ added in v0.2.15
func (err *ErrQueueNotFound) Error() string
type EventRepository ¶
type EventStore ¶ added in v0.1.6
type EventStore interface {
ReportEvents(message []*api.EventMessage) error
}
type JobRepository ¶
type JobRepository interface { PeekQueue(queue string, limit int64) ([]*api.Job, error) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error) AddJobs(job []*api.Job) ([]*SubmitJobResult, error) GetJobsByIds(ids []string) ([]*JobResult, error) GetExistingJobsByIds(ids []string) ([]*api.Job, error) FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error) GetQueueSizes(queues []*api.Queue) (sizes []int64, e error) IterateQueueJobs(queueName string, action func(*api.Job)) error GetQueueJobIds(queueName string) ([]string, error) RenewLease(clusterId string, jobIds []string) (renewed []string, e error) ExpireLeases(queue string, deadline time.Time) (expired []*api.Job, e error) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error) DeleteJobs(jobs []*api.Job) (map[*api.Job]error, error) GetActiveJobIds(queue string, jobSetId string) ([]string, error) GetLeasedJobIds(queue string) ([]string, error) UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error) UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error) GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error) GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error) AddRetryAttempt(jobId string) error GetNumberOfRetryAttempts(jobId string) (int, error) }
type JobResult ¶ added in v0.2.15
JobResult is used by GetJobsByIds to bundle a job with any error that occurred when getting the job.
type JobStartInfo ¶ added in v0.2.12
type QueueRepository ¶
type RedisEventRepository ¶
type RedisEventRepository struct {
// contains filtered or unexported fields
}
func NewRedisEventRepository ¶
func NewRedisEventRepository(db redis.UniversalClient, eventRetention configuration.EventRetentionPolicy) *RedisEventRepository
func (*RedisEventRepository) CheckStreamExists ¶ added in v0.2.19
func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId string) (bool, error)
func (*RedisEventRepository) GetLastMessageId ¶ added in v0.0.5
func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error)
func (*RedisEventRepository) ReadEvents ¶
func (repo *RedisEventRepository) ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)
func (*RedisEventRepository) ReportEvent ¶
func (repo *RedisEventRepository) ReportEvent(message *api.EventMessage) error
func (*RedisEventRepository) ReportEvents ¶
func (repo *RedisEventRepository) ReportEvents(messages []*api.EventMessage) error
type RedisHealth ¶ added in v0.2.3
type RedisHealth struct {
// contains filtered or unexported fields
}
func NewRedisHealth ¶ added in v0.2.3
func NewRedisHealth(db redis.UniversalClient) *RedisHealth
func (*RedisHealth) Check ¶ added in v0.2.3
func (r *RedisHealth) Check() error
type RedisJobRepository ¶
type RedisJobRepository struct {
// contains filtered or unexported fields
}
func NewRedisJobRepository ¶
func NewRedisJobRepository( db redis.UniversalClient, retentionPolicy configuration.DatabaseRetentionPolicy) *RedisJobRepository
func (*RedisJobRepository) AddJobs ¶ added in v0.0.3
func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, error)
func (*RedisJobRepository) AddRetryAttempt ¶ added in v0.1.16
func (repo *RedisJobRepository) AddRetryAttempt(jobId string) error
func (*RedisJobRepository) DeleteJobs ¶ added in v0.0.10
func (*RedisJobRepository) ExpireLeases ¶
func (*RedisJobRepository) FilterActiveQueues ¶
func (*RedisJobRepository) GetActiveJobIds ¶
func (repo *RedisJobRepository) GetActiveJobIds(queue string, jobSetId string) ([]string, error)
func (*RedisJobRepository) GetExistingJobsByIds ¶ added in v0.0.10
func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error)
GetExistingJobsByIds queries Redis for job details. Missing jobs are omitted, i.e., the returned list may be shorter than the provided list of IDs.
func (*RedisJobRepository) GetJobRunInfos ¶ added in v0.1.33
func (repo *RedisJobRepository) GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)
GetJobRunInfos returns run info for the cluster that each of the provided jobs is leased to. Jobs not leased to any cluster or that does not have a start time are omitted.
func (*RedisJobRepository) GetJobsByIds ¶
func (repo *RedisJobRepository) GetJobsByIds(ids []string) ([]*JobResult, error)
GetJobsByIds attempts to get all requested jobs from the database. Any error in getting a job is set to the Err field of the corresponding JobResult.
func (*RedisJobRepository) GetLeasedJobIds ¶ added in v0.1.32
func (repo *RedisJobRepository) GetLeasedJobIds(queue string) ([]string, error)
func (*RedisJobRepository) GetNumberOfRetryAttempts ¶ added in v0.1.16
func (repo *RedisJobRepository) GetNumberOfRetryAttempts(jobId string) (int, error)
func (*RedisJobRepository) GetQueueActiveJobSets ¶ added in v0.1.2
func (repo *RedisJobRepository) GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)
GetQueueActiveJobSets returns a list of length equal to the number of unique job sets in the given queue, where each element contains the number of queued and leased jobs that are part of that job set.
func (*RedisJobRepository) GetQueueJobIds ¶ added in v0.1.26
func (repo *RedisJobRepository) GetQueueJobIds(queueName string) ([]string, error)
func (*RedisJobRepository) GetQueueSizes ¶
func (repo *RedisJobRepository) GetQueueSizes(queues []*api.Queue) (sizes []int64, err error)
func (*RedisJobRepository) IterateQueueJobs ¶ added in v0.1.24
func (repo *RedisJobRepository) IterateQueueJobs(queueName string, action func(*api.Job)) error
IterateQueueJobs calls action for each job in queue with name queueName.
TODO action should return an error, which could be propagated back to the caller of this method.
func (*RedisJobRepository) PeekQueue ¶
PeekQueue returns the highest-priority jobs in the given queue. At most limits jobs are returned.
func (*RedisJobRepository) RenewLease ¶
func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error)
func (*RedisJobRepository) ReturnLease ¶
func (*RedisJobRepository) TryLeaseJobs ¶
func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error)
TryLeaseJobs attempts to assign jobs to a given cluster and returns a list composed of the jobs that were successfully leased.
func (*RedisJobRepository) UpdateJobs ¶ added in v0.2.6
func (repo *RedisJobRepository) UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error)
TODO Redis supports setting a retry parameter. Why do we re-implement that functionality?
func (*RedisJobRepository) UpdateStartTime ¶ added in v0.1.32
func (repo *RedisJobRepository) UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error)
type RedisQueueRepository ¶
type RedisQueueRepository struct {
// contains filtered or unexported fields
}
func NewRedisQueueRepository ¶
func NewRedisQueueRepository(db redis.UniversalClient) *RedisQueueRepository
func (*RedisQueueRepository) CreateQueue ¶
func (r *RedisQueueRepository) CreateQueue(queue queue.Queue) error
func (*RedisQueueRepository) DeleteQueue ¶ added in v0.1.14
func (r *RedisQueueRepository) DeleteQueue(name string) error
func (*RedisQueueRepository) GetAllQueues ¶
func (r *RedisQueueRepository) GetAllQueues() ([]queue.Queue, error)
func (*RedisQueueRepository) GetQueue ¶
func (r *RedisQueueRepository) GetQueue(name string) (queue.Queue, error)
func (*RedisQueueRepository) UpdateQueue ¶ added in v0.2.3
func (r *RedisQueueRepository) UpdateQueue(queue queue.Queue) error
TODO If the queue to be updated is deleted between this method checking if the queue exists and making the update, the deleted queue is re-added to Redis. There's no "update if exists" operation in Redis, so we need to do this with a script or transaction.
type RedisSchedulingInfoRepository ¶ added in v0.1.6
type RedisSchedulingInfoRepository struct {
// contains filtered or unexported fields
}
func NewRedisSchedulingInfoRepository ¶ added in v0.1.6
func NewRedisSchedulingInfoRepository(db redis.UniversalClient) *RedisSchedulingInfoRepository
func (*RedisSchedulingInfoRepository) GetClusterSchedulingInfo ¶ added in v0.1.6
func (r *RedisSchedulingInfoRepository) GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error)
func (*RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo ¶ added in v0.1.6
func (r *RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error
type RedisUsageRepository ¶
type RedisUsageRepository struct {
// contains filtered or unexported fields
}
func NewRedisUsageRepository ¶
func NewRedisUsageRepository(db redis.UniversalClient) *RedisUsageRepository
func (*RedisUsageRepository) GetClusterLeasedReports ¶ added in v0.1.6
func (r *RedisUsageRepository) GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)
func (*RedisUsageRepository) GetClusterPriorities ¶
func (r *RedisUsageRepository) GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)
GetClusterPriorities returns a map from clusterId to clusterPriority. This method makes a single aggregated call to the database, making this method more efficient than calling GetClusterPriority repeatredly.
func (*RedisUsageRepository) GetClusterPriority ¶
func (r *RedisUsageRepository) GetClusterPriority(clusterId string) (map[string]float64, error)
func (*RedisUsageRepository) GetClusterUsageReports ¶
func (r *RedisUsageRepository) GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)
func (*RedisUsageRepository) UpdateCluster ¶
func (r *RedisUsageRepository) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error
func (*RedisUsageRepository) UpdateClusterLeased ¶ added in v0.1.6
func (r *RedisUsageRepository) UpdateClusterLeased(report *api.ClusterLeasedReport) error
UpdateClusterLeased updates the count of resources leased to a particular cluster.
type SchedulingInfoRepository ¶ added in v0.1.6
type SchedulingInfoRepository interface { GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error) UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error }
type SubmitJobResult ¶ added in v0.0.3
type SubmitJobResult struct { JobId string SubmittedJob *api.Job DuplicateDetected bool Error error }
TODO DuplicateDetected should be remove in favor of setting the error to indicate the job already exists (e.g., by creating ErrJobExists).
type UpdateJobResult ¶ added in v0.2.8
type UsageRepository ¶
type UsageRepository interface { GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error) GetClusterPriority(clusterId string) (map[string]float64, error) GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error) GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error UpdateClusterLeased(report *api.ClusterLeasedReport) error }