repository

package
v0.2.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrJobNotFound added in v0.2.15

type ErrJobNotFound struct {
	JobId     string
	ClusterId string
}

func (*ErrJobNotFound) Error added in v0.2.15

func (err *ErrJobNotFound) Error() string

type ErrQueueAlreadyExists added in v0.2.3

type ErrQueueAlreadyExists struct {
	QueueName string
}

func (*ErrQueueAlreadyExists) Error added in v0.2.15

func (err *ErrQueueAlreadyExists) Error() string

type ErrQueueNotFound added in v0.2.3

type ErrQueueNotFound struct {
	QueueName string
}

func (*ErrQueueNotFound) Error added in v0.2.15

func (err *ErrQueueNotFound) Error() string

type EventRepository

type EventRepository interface {
	CheckStreamExists(queue string, jobSetId string) (bool, error)
	ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)
	GetLastMessageId(queue, jobSetId string) (string, error)
}

type EventStore added in v0.1.6

type EventStore interface {
	ReportEvents(message []*api.EventMessage) error
}

type JobRepository

type JobRepository interface {
	PeekQueue(queue string, limit int64) ([]*api.Job, error)
	TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error)
	AddJobs(job []*api.Job) ([]*SubmitJobResult, error)
	GetJobsByIds(ids []string) ([]*JobResult, error)
	GetExistingJobsByIds(ids []string) ([]*api.Job, error)
	FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)
	GetQueueSizes(queues []*api.Queue) (sizes []int64, e error)
	IterateQueueJobs(queueName string, action func(*api.Job)) error
	GetQueueJobIds(queueName string) ([]string, error)
	RenewLease(clusterId string, jobIds []string) (renewed []string, e error)
	ExpireLeases(queue string, deadline time.Time) (expired []*api.Job, e error)
	ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)
	DeleteJobs(jobs []*api.Job) (map[*api.Job]error, error)
	GetActiveJobIds(queue string, jobSetId string) ([]string, error)
	GetLeasedJobIds(queue string) ([]string, error)
	UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error)
	UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error)
	GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)
	GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)
	AddRetryAttempt(jobId string) error
	GetNumberOfRetryAttempts(jobId string) (int, error)
}

type JobResult added in v0.2.15

type JobResult struct {
	JobId string
	Job   *api.Job
	Error error
}

JobResult is used by GetJobsByIds to bundle a job with any error that occurred when getting the job.

type JobStartInfo added in v0.2.12

type JobStartInfo struct {
	// Unique ID assigned to each job.
	JobId string
	// Name of the cluster (as specified in the executor config) the job is assigned to.
	ClusterId string
	StartTime time.Time
}

type QueueRepository

type QueueRepository interface {
	GetAllQueues() ([]queue.Queue, error)
	GetQueue(name string) (queue.Queue, error)
	CreateQueue(queue.Queue) error
	UpdateQueue(queue.Queue) error
	DeleteQueue(name string) error
}

type RedisEventRepository

type RedisEventRepository struct {
	// contains filtered or unexported fields
}

func (*RedisEventRepository) CheckStreamExists added in v0.2.19

func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId string) (bool, error)

func (*RedisEventRepository) GetLastMessageId added in v0.0.5

func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error)

func (*RedisEventRepository) ReadEvents

func (repo *RedisEventRepository) ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)

func (*RedisEventRepository) ReportEvent

func (repo *RedisEventRepository) ReportEvent(message *api.EventMessage) error

func (*RedisEventRepository) ReportEvents

func (repo *RedisEventRepository) ReportEvents(messages []*api.EventMessage) error

type RedisHealth added in v0.2.3

type RedisHealth struct {
	// contains filtered or unexported fields
}

func NewRedisHealth added in v0.2.3

func NewRedisHealth(db redis.UniversalClient) *RedisHealth

func (*RedisHealth) Check added in v0.2.3

func (r *RedisHealth) Check() error

type RedisJobRepository

type RedisJobRepository struct {
	// contains filtered or unexported fields
}

func (*RedisJobRepository) AddJobs added in v0.0.3

func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, error)

func (*RedisJobRepository) AddRetryAttempt added in v0.1.16

func (repo *RedisJobRepository) AddRetryAttempt(jobId string) error

func (*RedisJobRepository) DeleteJobs added in v0.0.10

func (repo *RedisJobRepository) DeleteJobs(jobs []*api.Job) (map[*api.Job]error, error)

func (*RedisJobRepository) ExpireLeases

func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) ([]*api.Job, error)

func (*RedisJobRepository) FilterActiveQueues

func (repo *RedisJobRepository) FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)

func (*RedisJobRepository) GetActiveJobIds

func (repo *RedisJobRepository) GetActiveJobIds(queue string, jobSetId string) ([]string, error)

func (*RedisJobRepository) GetExistingJobsByIds added in v0.0.10

func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error)

GetExistingJobsByIds queries Redis for job details. Missing jobs are omitted, i.e., the returned list may be shorter than the provided list of IDs.

func (*RedisJobRepository) GetJobRunInfos added in v0.1.33

func (repo *RedisJobRepository) GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)

GetJobRunInfos returns run info for the cluster that each of the provided jobs is leased to. Jobs not leased to any cluster or that does not have a start time are omitted.

func (*RedisJobRepository) GetJobsByIds

func (repo *RedisJobRepository) GetJobsByIds(ids []string) ([]*JobResult, error)

GetJobsByIds attempts to get all requested jobs from the database. Any error in getting a job is set to the Err field of the corresponding JobResult.

func (*RedisJobRepository) GetLeasedJobIds added in v0.1.32

func (repo *RedisJobRepository) GetLeasedJobIds(queue string) ([]string, error)

func (*RedisJobRepository) GetNumberOfRetryAttempts added in v0.1.16

func (repo *RedisJobRepository) GetNumberOfRetryAttempts(jobId string) (int, error)

func (*RedisJobRepository) GetQueueActiveJobSets added in v0.1.2

func (repo *RedisJobRepository) GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)

GetQueueActiveJobSets returns a list of length equal to the number of unique job sets in the given queue, where each element contains the number of queued and leased jobs that are part of that job set.

func (*RedisJobRepository) GetQueueJobIds added in v0.1.26

func (repo *RedisJobRepository) GetQueueJobIds(queueName string) ([]string, error)

func (*RedisJobRepository) GetQueueSizes

func (repo *RedisJobRepository) GetQueueSizes(queues []*api.Queue) (sizes []int64, err error)

func (*RedisJobRepository) IterateQueueJobs added in v0.1.24

func (repo *RedisJobRepository) IterateQueueJobs(queueName string, action func(*api.Job)) error

IterateQueueJobs calls action for each job in queue with name queueName.

TODO action should return an error, which could be propagated back to the caller of this method.

func (*RedisJobRepository) PeekQueue

func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job, error)

PeekQueue returns the highest-priority jobs in the given queue. At most limits jobs are returned.

func (*RedisJobRepository) RenewLease

func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error)

func (*RedisJobRepository) ReturnLease

func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)

func (*RedisJobRepository) TryLeaseJobs

func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error)

TryLeaseJobs attempts to assign jobs to a given cluster and returns a list composed of the jobs that were successfully leased.

func (*RedisJobRepository) UpdateJobs added in v0.2.6

func (repo *RedisJobRepository) UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error)

TODO Redis supports setting a retry parameter. Why do we re-implement that functionality?

func (*RedisJobRepository) UpdateStartTime added in v0.1.32

func (repo *RedisJobRepository) UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error)

type RedisQueueRepository

type RedisQueueRepository struct {
	// contains filtered or unexported fields
}

func NewRedisQueueRepository

func NewRedisQueueRepository(db redis.UniversalClient) *RedisQueueRepository

func (*RedisQueueRepository) CreateQueue

func (r *RedisQueueRepository) CreateQueue(queue queue.Queue) error

func (*RedisQueueRepository) DeleteQueue added in v0.1.14

func (r *RedisQueueRepository) DeleteQueue(name string) error

func (*RedisQueueRepository) GetAllQueues

func (r *RedisQueueRepository) GetAllQueues() ([]queue.Queue, error)

func (*RedisQueueRepository) GetQueue

func (r *RedisQueueRepository) GetQueue(name string) (queue.Queue, error)

func (*RedisQueueRepository) UpdateQueue added in v0.2.3

func (r *RedisQueueRepository) UpdateQueue(queue queue.Queue) error

TODO If the queue to be updated is deleted between this method checking if the queue exists and making the update, the deleted queue is re-added to Redis. There's no "update if exists" operation in Redis, so we need to do this with a script or transaction.

type RedisSchedulingInfoRepository added in v0.1.6

type RedisSchedulingInfoRepository struct {
	// contains filtered or unexported fields
}

func NewRedisSchedulingInfoRepository added in v0.1.6

func NewRedisSchedulingInfoRepository(db redis.UniversalClient) *RedisSchedulingInfoRepository

func (*RedisSchedulingInfoRepository) GetClusterSchedulingInfo added in v0.1.6

func (r *RedisSchedulingInfoRepository) GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error)

func (*RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo added in v0.1.6

func (r *RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error

type RedisUsageRepository

type RedisUsageRepository struct {
	// contains filtered or unexported fields
}

func NewRedisUsageRepository

func NewRedisUsageRepository(db redis.UniversalClient) *RedisUsageRepository

func (*RedisUsageRepository) GetClusterLeasedReports added in v0.1.6

func (r *RedisUsageRepository) GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)

func (*RedisUsageRepository) GetClusterPriorities

func (r *RedisUsageRepository) GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)

GetClusterPriorities returns a map from clusterId to clusterPriority. This method makes a single aggregated call to the database, making this method more efficient than calling GetClusterPriority repeatredly.

func (*RedisUsageRepository) GetClusterPriority

func (r *RedisUsageRepository) GetClusterPriority(clusterId string) (map[string]float64, error)

func (*RedisUsageRepository) GetClusterUsageReports

func (r *RedisUsageRepository) GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)

func (*RedisUsageRepository) UpdateCluster

func (r *RedisUsageRepository) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error

func (*RedisUsageRepository) UpdateClusterLeased added in v0.1.6

func (r *RedisUsageRepository) UpdateClusterLeased(report *api.ClusterLeasedReport) error

UpdateClusterLeased updates the count of resources leased to a particular cluster.

type RunInfo added in v0.1.33

type RunInfo struct {
	StartTime        time.Time
	CurrentClusterId string
}

type SchedulingInfoRepository added in v0.1.6

type SchedulingInfoRepository interface {
	GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error)
	UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error
}

type SubmitJobResult added in v0.0.3

type SubmitJobResult struct {
	JobId             string
	SubmittedJob      *api.Job
	DuplicateDetected bool
	Error             error
}

TODO DuplicateDetected should be remove in favor of setting the error to indicate the job already exists (e.g., by creating ErrJobExists).

type UpdateJobResult added in v0.2.8

type UpdateJobResult struct {
	JobId string
	Job   *api.Job
	Error error
}

type Usage

type Usage struct {
	PriorityPerQueue     map[string]float64
	CurrentUsagePerQueue map[string]float64
}

type UsageRepository

type UsageRepository interface {
	GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)
	GetClusterPriority(clusterId string) (map[string]float64, error)
	GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)
	GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)

	UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error
	UpdateClusterLeased(report *api.ClusterLeasedReport) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL