Documentation ¶
Index ¶
- type ErrJobNotFound
- type ErrQueueAlreadyExists
- type ErrQueueNotFound
- type EventRepository
- type EventStore
- type JobRepository
- type JobResult
- type JobSetFilter
- type JobStartInfo
- type QueueRepository
- type RedisEventRepository
- func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId string) (bool, error)
- func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error)
- func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)
- type RedisHealth
- type RedisJobRepository
- func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, error)
- func (repo *RedisJobRepository) AddRetryAttempt(jobId string) error
- func (repo *RedisJobRepository) DeleteJobs(jobs []*api.Job) (map[*api.Job]error, error)
- func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) ([]*api.Job, error)
- func (repo *RedisJobRepository) ExpireLeasesById(jobIds []string, deadline time.Time) ([]*api.Job, error)
- func (repo *RedisJobRepository) FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)
- func (repo *RedisJobRepository) GetActiveJobIds(queue string, jobSetId string) ([]string, error)
- func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error)
- func (repo *RedisJobRepository) GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)
- func (repo *RedisJobRepository) GetJobSetJobIds(queue string, jobSetId string, filter *JobSetFilter) ([]string, error)
- func (repo *RedisJobRepository) GetJobsByIds(ids []string) ([]*JobResult, error)
- func (repo *RedisJobRepository) GetLeasedJobIds(queue string) ([]string, error)
- func (repo *RedisJobRepository) GetNumberOfRetryAttempts(jobId string) (int, error)
- func (repo *RedisJobRepository) GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)
- func (repo *RedisJobRepository) GetQueueJobIds(queueName string) ([]string, error)
- func (repo *RedisJobRepository) GetQueueSizes(queues []*api.Queue) (sizes []int64, err error)
- func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job, error)
- func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error)
- func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)
- func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, jobIdsByQueue map[string][]string) (map[string][]string, error)
- func (repo *RedisJobRepository) UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error)
- func (repo *RedisJobRepository) UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error)
- type RedisQueueRepository
- func (r *RedisQueueRepository) CreateQueue(queue queue.Queue) error
- func (r *RedisQueueRepository) DeleteQueue(name string) error
- func (r *RedisQueueRepository) GetAllQueues() ([]queue.Queue, error)
- func (r *RedisQueueRepository) GetQueue(name string) (queue.Queue, error)
- func (r *RedisQueueRepository) UpdateQueue(queue queue.Queue) error
- type RedisSchedulingInfoRepository
- type RedisUsageRepository
- func (r *RedisUsageRepository) GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)
- func (r *RedisUsageRepository) GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)
- func (r *RedisUsageRepository) GetClusterPriority(clusterId string) (map[string]float64, error)
- func (r *RedisUsageRepository) GetClusterQueueResourceUsage() (map[string]*schedulerobjects.ClusterResourceUsageReport, error)
- func (r *RedisUsageRepository) GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)
- func (r *RedisUsageRepository) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error
- func (r *RedisUsageRepository) UpdateClusterLeased(report *api.ClusterLeasedReport) error
- func (r *RedisUsageRepository) UpdateClusterQueueResourceUsage(cluster string, resourceUsage *schedulerobjects.ClusterResourceUsageReport) error
- type RunInfo
- type SchedulingInfoRepository
- type StreamEventStore
- type SubmitJobResult
- type TestEventStore
- type UpdateJobResult
- type Usage
- type UsageRepository
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrJobNotFound ¶
func (*ErrJobNotFound) Error ¶
func (err *ErrJobNotFound) Error() string
type ErrQueueAlreadyExists ¶
type ErrQueueAlreadyExists struct {
QueueName string
}
func (*ErrQueueAlreadyExists) Error ¶
func (err *ErrQueueAlreadyExists) Error() string
type ErrQueueNotFound ¶
type ErrQueueNotFound struct {
QueueName string
}
func (*ErrQueueNotFound) Error ¶
func (err *ErrQueueNotFound) Error() string
type EventRepository ¶
type EventStore ¶
type EventStore interface {
ReportEvents(message []*api.EventMessage) error
}
type JobRepository ¶
type JobRepository interface { PeekQueue(queue string, limit int64) ([]*api.Job, error) // TryLeaseJobs attempts to lease a set of jobs to the executor with the given clusterId. // Takes as argument a map from queue name to slice of job ids to lease from that queue. // Returns a map from queue name to ids of successfully leased jobs for that queue. TryLeaseJobs(clusterId string, jobIdsByQueue map[string][]string) (map[string][]string, error) AddJobs(job []*api.Job) ([]*SubmitJobResult, error) GetJobsByIds(ids []string) ([]*JobResult, error) GetExistingJobsByIds(ids []string) ([]*api.Job, error) FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error) GetQueueSizes(queues []*api.Queue) (sizes []int64, e error) GetQueueJobIds(queueName string) ([]string, error) RenewLease(clusterId string, jobIds []string) (renewed []string, e error) ExpireLeases(queue string, deadline time.Time) (expired []*api.Job, e error) ExpireLeasesById(jobIds []string, deadline time.Time) (expired []*api.Job, e error) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error) DeleteJobs(jobs []*api.Job) (map[*api.Job]error, error) GetActiveJobIds(queue string, jobSetId string) ([]string, error) GetJobSetJobIds(queue string, jobSetId string, filter *JobSetFilter) ([]string, error) GetLeasedJobIds(queue string) ([]string, error) UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error) UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error) GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error) GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error) AddRetryAttempt(jobId string) error GetNumberOfRetryAttempts(jobId string) (int, error) }
type JobResult ¶
JobResult is used by GetJobsByIds to bundle a job with any error that occurred when getting the job.
type JobSetFilter ¶
type JobStartInfo ¶
type QueueRepository ¶
type RedisEventRepository ¶
type RedisEventRepository struct {
// contains filtered or unexported fields
}
func NewEventRepository ¶
func NewEventRepository(db redis.UniversalClient) *RedisEventRepository
func (*RedisEventRepository) CheckStreamExists ¶
func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId string) (bool, error)
func (*RedisEventRepository) GetLastMessageId ¶
func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error)
func (*RedisEventRepository) ReadEvents ¶
func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)
type RedisHealth ¶
type RedisHealth struct {
// contains filtered or unexported fields
}
func NewRedisHealth ¶
func NewRedisHealth(db redis.UniversalClient) *RedisHealth
func (*RedisHealth) Check ¶
func (r *RedisHealth) Check() error
type RedisJobRepository ¶
type RedisJobRepository struct {
// contains filtered or unexported fields
}
func NewRedisJobRepository ¶
func NewRedisJobRepository( db redis.UniversalClient, retentionPolicy configuration.DatabaseRetentionPolicy, ) *RedisJobRepository
func (*RedisJobRepository) AddJobs ¶
func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, error)
func (*RedisJobRepository) AddRetryAttempt ¶
func (repo *RedisJobRepository) AddRetryAttempt(jobId string) error
func (*RedisJobRepository) DeleteJobs ¶
func (*RedisJobRepository) ExpireLeases ¶
ExpireLeases expires the leases on all jobs for the provided queue.
func (*RedisJobRepository) ExpireLeasesById ¶
func (*RedisJobRepository) FilterActiveQueues ¶
func (*RedisJobRepository) GetActiveJobIds ¶
func (repo *RedisJobRepository) GetActiveJobIds(queue string, jobSetId string) ([]string, error)
func (*RedisJobRepository) GetExistingJobsByIds ¶
func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error)
GetExistingJobsByIds queries Redis for job details. Missing jobs are omitted, i.e., the returned list may be shorter than the provided list of IDs.
func (*RedisJobRepository) GetJobRunInfos ¶
func (repo *RedisJobRepository) GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)
GetJobRunInfos returns run info for the cluster that each of the provided jobs is leased to. Jobs not leased to any cluster or that does not have a start time are omitted.
func (*RedisJobRepository) GetJobSetJobIds ¶
func (repo *RedisJobRepository) GetJobSetJobIds(queue string, jobSetId string, filter *JobSetFilter) ([]string, error)
func (*RedisJobRepository) GetJobsByIds ¶
func (repo *RedisJobRepository) GetJobsByIds(ids []string) ([]*JobResult, error)
GetJobsByIds attempts to get all requested jobs from the database. Any error in getting a job is set to the Err field of the corresponding JobResult.
func (*RedisJobRepository) GetLeasedJobIds ¶
func (repo *RedisJobRepository) GetLeasedJobIds(queue string) ([]string, error)
func (*RedisJobRepository) GetNumberOfRetryAttempts ¶
func (repo *RedisJobRepository) GetNumberOfRetryAttempts(jobId string) (int, error)
func (*RedisJobRepository) GetQueueActiveJobSets ¶
func (repo *RedisJobRepository) GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)
GetQueueActiveJobSets returns a list of length equal to the number of unique job sets in the given queue, where each element contains the number of queued and leased jobs that are part of that job set.
func (*RedisJobRepository) GetQueueJobIds ¶
func (repo *RedisJobRepository) GetQueueJobIds(queueName string) ([]string, error)
func (*RedisJobRepository) GetQueueSizes ¶
func (repo *RedisJobRepository) GetQueueSizes(queues []*api.Queue) (sizes []int64, err error)
func (*RedisJobRepository) PeekQueue ¶
PeekQueue returns the highest-priority jobs in the given queue. At most limits jobs are returned.
func (*RedisJobRepository) RenewLease ¶
func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error)
func (*RedisJobRepository) ReturnLease ¶
func (*RedisJobRepository) TryLeaseJobs ¶
func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, jobIdsByQueue map[string][]string) (map[string][]string, error)
TryLeaseJobs attempts to assign jobs to a given cluster and returns a list composed of the jobs that were successfully leased.
func (*RedisJobRepository) UpdateJobs ¶
func (repo *RedisJobRepository) UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error)
TODO Redis supports setting a retry parameter. Why do we re-implement that functionality?
func (*RedisJobRepository) UpdateStartTime ¶
func (repo *RedisJobRepository) UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error)
type RedisQueueRepository ¶
type RedisQueueRepository struct {
// contains filtered or unexported fields
}
func NewRedisQueueRepository ¶
func NewRedisQueueRepository(db redis.UniversalClient) *RedisQueueRepository
func (*RedisQueueRepository) CreateQueue ¶
func (r *RedisQueueRepository) CreateQueue(queue queue.Queue) error
func (*RedisQueueRepository) DeleteQueue ¶
func (r *RedisQueueRepository) DeleteQueue(name string) error
func (*RedisQueueRepository) GetAllQueues ¶
func (r *RedisQueueRepository) GetAllQueues() ([]queue.Queue, error)
func (*RedisQueueRepository) GetQueue ¶
func (r *RedisQueueRepository) GetQueue(name string) (queue.Queue, error)
func (*RedisQueueRepository) UpdateQueue ¶
func (r *RedisQueueRepository) UpdateQueue(queue queue.Queue) error
TODO If the queue to be updated is deleted between this method checking if the queue exists and making the update, the deleted queue is re-added to Redis. There's no "update if exists" operation in Redis, so we need to do this with a script or transaction.
type RedisSchedulingInfoRepository ¶
type RedisSchedulingInfoRepository struct {
// contains filtered or unexported fields
}
func NewRedisSchedulingInfoRepository ¶
func NewRedisSchedulingInfoRepository(db redis.UniversalClient) *RedisSchedulingInfoRepository
func (*RedisSchedulingInfoRepository) GetClusterSchedulingInfo ¶
func (r *RedisSchedulingInfoRepository) GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error)
func (*RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo ¶
func (r *RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error
type RedisUsageRepository ¶
type RedisUsageRepository struct {
// contains filtered or unexported fields
}
func NewRedisUsageRepository ¶
func NewRedisUsageRepository(db redis.UniversalClient) *RedisUsageRepository
func (*RedisUsageRepository) GetClusterLeasedReports ¶
func (r *RedisUsageRepository) GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)
func (*RedisUsageRepository) GetClusterPriorities ¶
func (r *RedisUsageRepository) GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)
GetClusterPriorities returns a map from clusterId to clusterPriority. This method makes a single aggregated call to the database, making this method more efficient than calling GetClusterPriority repeatredly.
func (*RedisUsageRepository) GetClusterPriority ¶
func (r *RedisUsageRepository) GetClusterPriority(clusterId string) (map[string]float64, error)
func (*RedisUsageRepository) GetClusterQueueResourceUsage ¶
func (r *RedisUsageRepository) GetClusterQueueResourceUsage() (map[string]*schedulerobjects.ClusterResourceUsageReport, error)
func (*RedisUsageRepository) GetClusterUsageReports ¶
func (r *RedisUsageRepository) GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)
func (*RedisUsageRepository) UpdateCluster ¶
func (r *RedisUsageRepository) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error
func (*RedisUsageRepository) UpdateClusterLeased ¶
func (r *RedisUsageRepository) UpdateClusterLeased(report *api.ClusterLeasedReport) error
UpdateClusterLeased updates the count of resources leased to a particular cluster.
func (*RedisUsageRepository) UpdateClusterQueueResourceUsage ¶
func (r *RedisUsageRepository) UpdateClusterQueueResourceUsage(cluster string, resourceUsage *schedulerobjects.ClusterResourceUsageReport) error
type SchedulingInfoRepository ¶
type SchedulingInfoRepository interface { GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error) UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error }
type StreamEventStore ¶
func NewEventStore ¶
func NewEventStore(producer pulsar.Producer, maxAllowedMessageSize uint) *StreamEventStore
func (*StreamEventStore) ReportEvents ¶
func (n *StreamEventStore) ReportEvents(apiEvents []*api.EventMessage) error
type SubmitJobResult ¶
type SubmitJobResult struct { JobId string SubmittedJob *api.Job AlreadyProcessed bool DuplicateDetected bool Error error }
TODO DuplicateDetected should be remove in favour of setting the error to indicate the job already exists (e.g., by creating ErrJobExists).
type TestEventStore ¶
type TestEventStore struct {
ReceivedEvents []*api.EventMessage
}
func (*TestEventStore) ReportEvents ¶
func (es *TestEventStore) ReportEvents(message []*api.EventMessage) error
type UsageRepository ¶
type UsageRepository interface { GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error) GetClusterPriority(clusterId string) (map[string]float64, error) GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error) GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error) GetClusterQueueResourceUsage() (map[string]*schedulerobjects.ClusterResourceUsageReport, error) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error UpdateClusterLeased(report *api.ClusterLeasedReport) error UpdateClusterQueueResourceUsage(cluster string, resourceUsage *schedulerobjects.ClusterResourceUsageReport) error }