repository

package
v0.3.75-rc-a678bb5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2023 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrJobNotFound

type ErrJobNotFound struct {
	JobId     string
	ClusterId string
}

func (*ErrJobNotFound) Error

func (err *ErrJobNotFound) Error() string

type ErrQueueAlreadyExists

type ErrQueueAlreadyExists struct {
	QueueName string
}

func (*ErrQueueAlreadyExists) Error

func (err *ErrQueueAlreadyExists) Error() string

type ErrQueueNotFound

type ErrQueueNotFound struct {
	QueueName string
}

func (*ErrQueueNotFound) Error

func (err *ErrQueueNotFound) Error() string

type EventRepository

type EventRepository interface {
	CheckStreamExists(queue string, jobSetId string) (bool, error)
	ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)
	GetLastMessageId(queue, jobSetId string) (string, error)
}

type EventStore

type EventStore interface {
	ReportEvents(context.Context, []*api.EventMessage) error
}

type JobRepository

type JobRepository interface {
	PeekQueue(queue string, limit int64) ([]*api.Job, error)
	// TryLeaseJobs attempts to lease a set of jobs to the executor with the given clusterId.
	// Takes as argument a map from queue name to slice of job ids to lease from that queue.
	// Returns a map from queue name to ids of successfully leased jobs for that queue.
	TryLeaseJobs(clusterId string, jobIdsByQueue map[string][]string) (map[string][]string, error)
	AddJobs(job []*api.Job) ([]*SubmitJobResult, error)
	GetJobsByIds(ids []string) ([]*JobResult, error)
	GetExistingJobsByIds(ids []string) ([]*api.Job, error)
	FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)
	GetQueueSizes(queues []*api.Queue) (sizes []int64, e error)
	GetQueueJobIds(queueName string) ([]string, error)
	RenewLease(clusterId string, jobIds []string) (renewed []string, e error)
	ExpireLeases(queue string, deadline time.Time) (expired []*api.Job, e error)
	ExpireLeasesById(jobIds []string, deadline time.Time) (expired []*api.Job, e error)
	ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)
	DeleteJobs(jobs []*api.Job) (map[*api.Job]error, error)
	GetActiveJobIds(queue string, jobSetId string) ([]string, error)
	GetJobSetJobIds(queue string, jobSetId string, filter *JobSetFilter) ([]string, error)
	GetLeasedJobIds(queue string) ([]string, error)
	UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error)
	UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error)
	GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)
	GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)
	AddRetryAttempt(jobId string) error
	GetNumberOfRetryAttempts(jobId string) (int, error)
	StorePulsarSchedulerJobDetails(jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error
	GetPulsarSchedulerJobDetails(jobIds string) (*schedulerobjects.PulsarSchedulerJobDetails, error)
	DeletePulsarSchedulerJobDetails(jobId []string) error
}

type JobResult

type JobResult struct {
	JobId string
	Job   *api.Job
	Error error
}

JobResult is used by GetJobsByIds to bundle a job with any error that occurred when getting the job.

type JobSetFilter

type JobSetFilter struct {
	IncludeQueued bool
	IncludeLeased bool
}

type JobStartInfo

type JobStartInfo struct {
	// Unique ID assigned to each job.
	JobId string
	// Name of the cluster (as specified in the executor config) the job is assigned to.
	ClusterId string
	StartTime time.Time
}

type QueueRepository

type QueueRepository interface {
	GetAllQueues() ([]queue.Queue, error)
	GetQueue(name string) (queue.Queue, error)
	CreateQueue(queue.Queue) error
	UpdateQueue(queue.Queue) error
	DeleteQueue(name string) error
}

type RedisEventRepository

type RedisEventRepository struct {
	// contains filtered or unexported fields
}

func NewEventRepository

func NewEventRepository(db redis.UniversalClient) *RedisEventRepository

func (*RedisEventRepository) CheckStreamExists

func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId string) (bool, error)

func (*RedisEventRepository) GetLastMessageId

func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error)

func (*RedisEventRepository) ReadEvents

func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)

type RedisHealth

type RedisHealth struct {
	// contains filtered or unexported fields
}

func NewRedisHealth

func NewRedisHealth(db redis.UniversalClient) *RedisHealth

func (*RedisHealth) Check

func (r *RedisHealth) Check() error

type RedisJobRepository

type RedisJobRepository struct {
	// contains filtered or unexported fields
}

func NewRedisJobRepository

func NewRedisJobRepository(
	db redis.UniversalClient,
) *RedisJobRepository

func (*RedisJobRepository) AddJobs

func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, error)

func (*RedisJobRepository) AddRetryAttempt

func (repo *RedisJobRepository) AddRetryAttempt(jobId string) error

func (*RedisJobRepository) DeleteJobs

func (repo *RedisJobRepository) DeleteJobs(jobs []*api.Job) (map[*api.Job]error, error)

func (*RedisJobRepository) DeletePulsarSchedulerJobDetails added in v0.3.49

func (repo *RedisJobRepository) DeletePulsarSchedulerJobDetails(jobIds []string) error

func (*RedisJobRepository) ExpireLeases

func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) ([]*api.Job, error)

ExpireLeases expires the leases on all jobs for the provided queue.

func (*RedisJobRepository) ExpireLeasesById

func (repo *RedisJobRepository) ExpireLeasesById(jobIds []string, deadline time.Time) ([]*api.Job, error)

func (*RedisJobRepository) FilterActiveQueues

func (repo *RedisJobRepository) FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)

func (*RedisJobRepository) GetActiveJobIds

func (repo *RedisJobRepository) GetActiveJobIds(queue string, jobSetId string) ([]string, error)

func (*RedisJobRepository) GetExistingJobsByIds

func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error)

GetExistingJobsByIds queries Redis for job details. Missing jobs are omitted, i.e., the returned list may be shorter than the provided list of IDs.

func (*RedisJobRepository) GetJobRunInfos

func (repo *RedisJobRepository) GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)

GetJobRunInfos returns run info for the cluster that each of the provided jobs is leased to. Jobs not leased to any cluster or that does not have a start time are omitted.

func (*RedisJobRepository) GetJobSetJobIds

func (repo *RedisJobRepository) GetJobSetJobIds(queue string, jobSetId string, filter *JobSetFilter) ([]string, error)

func (*RedisJobRepository) GetJobsByIds

func (repo *RedisJobRepository) GetJobsByIds(ids []string) ([]*JobResult, error)

GetJobsByIds attempts to get all requested jobs from the database. Any error in getting a job is set to the Err field of the corresponding JobResult.

func (*RedisJobRepository) GetLeasedJobIds

func (repo *RedisJobRepository) GetLeasedJobIds(queue string) ([]string, error)

func (*RedisJobRepository) GetNumberOfRetryAttempts

func (repo *RedisJobRepository) GetNumberOfRetryAttempts(jobId string) (int, error)

func (*RedisJobRepository) GetPulsarSchedulerJobDetails added in v0.3.49

func (repo *RedisJobRepository) GetPulsarSchedulerJobDetails(jobId string) (*schedulerobjects.PulsarSchedulerJobDetails, error)

func (*RedisJobRepository) GetQueueActiveJobSets

func (repo *RedisJobRepository) GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)

GetQueueActiveJobSets returns a list of length equal to the number of unique job sets in the given queue, where each element contains the number of queued and leased jobs that are part of that job set.

func (*RedisJobRepository) GetQueueJobIds

func (repo *RedisJobRepository) GetQueueJobIds(queueName string) ([]string, error)

func (*RedisJobRepository) GetQueueSizes

func (repo *RedisJobRepository) GetQueueSizes(queues []*api.Queue) (sizes []int64, err error)

func (*RedisJobRepository) PeekQueue

func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job, error)

PeekQueue returns the highest-priority jobs in the given queue. At most limits jobs are returned.

func (*RedisJobRepository) RenewLease

func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error)

func (*RedisJobRepository) ReturnLease

func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)

func (*RedisJobRepository) StorePulsarSchedulerJobDetails added in v0.3.49

func (repo *RedisJobRepository) StorePulsarSchedulerJobDetails(jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error

func (*RedisJobRepository) TryLeaseJobs

func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, jobIdsByQueue map[string][]string) (map[string][]string, error)

TryLeaseJobs attempts to assign jobs to a given cluster and returns a list composed of the jobs that were successfully leased.

func (*RedisJobRepository) UpdateJobs

func (repo *RedisJobRepository) UpdateJobs(ids []string, mutator func([]*api.Job)) ([]UpdateJobResult, error)

TODO Redis supports setting a retry parameter. Why do we re-implement that functionality?

func (*RedisJobRepository) UpdateStartTime

func (repo *RedisJobRepository) UpdateStartTime(jobStartInfos []*JobStartInfo) ([]error, error)

type RedisQueueRepository

type RedisQueueRepository struct {
	// contains filtered or unexported fields
}

func NewRedisQueueRepository

func NewRedisQueueRepository(db redis.UniversalClient) *RedisQueueRepository

func (*RedisQueueRepository) CreateQueue

func (r *RedisQueueRepository) CreateQueue(queue queue.Queue) error

func (*RedisQueueRepository) DeleteQueue

func (r *RedisQueueRepository) DeleteQueue(name string) error

func (*RedisQueueRepository) GetAllQueues

func (r *RedisQueueRepository) GetAllQueues() ([]queue.Queue, error)

func (*RedisQueueRepository) GetQueue

func (r *RedisQueueRepository) GetQueue(name string) (queue.Queue, error)

func (*RedisQueueRepository) UpdateQueue

func (r *RedisQueueRepository) UpdateQueue(queue queue.Queue) error

TODO If the queue to be updated is deleted between this method checking if the queue exists and making the update, the deleted queue is re-added to Redis. There's no "update if exists" operation in Redis, so we need to do this with a script or transaction.

type RedisSchedulingInfoRepository

type RedisSchedulingInfoRepository struct {
	// contains filtered or unexported fields
}

func (*RedisSchedulingInfoRepository) GetClusterSchedulingInfo

func (r *RedisSchedulingInfoRepository) GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error)

func (*RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo

func (r *RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error

type RedisUsageRepository

type RedisUsageRepository struct {
	// contains filtered or unexported fields
}

func NewRedisUsageRepository

func NewRedisUsageRepository(db redis.UniversalClient) *RedisUsageRepository

func (*RedisUsageRepository) GetClusterLeasedReports

func (r *RedisUsageRepository) GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)

func (*RedisUsageRepository) GetClusterPriorities

func (r *RedisUsageRepository) GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)

GetClusterPriorities returns a map from clusterId to clusterPriority. This method makes a single aggregated call to the database, making this method more efficient than calling GetClusterPriority repeatredly.

func (*RedisUsageRepository) GetClusterPriority

func (r *RedisUsageRepository) GetClusterPriority(clusterId string) (map[string]float64, error)

func (*RedisUsageRepository) GetClusterQueueResourceUsage

func (r *RedisUsageRepository) GetClusterQueueResourceUsage() (map[string]*schedulerobjects.ClusterResourceUsageReport, error)

func (*RedisUsageRepository) GetClusterUsageReports

func (r *RedisUsageRepository) GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)

func (*RedisUsageRepository) UpdateCluster

func (r *RedisUsageRepository) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error

func (*RedisUsageRepository) UpdateClusterLeased

func (r *RedisUsageRepository) UpdateClusterLeased(report *api.ClusterLeasedReport) error

UpdateClusterLeased updates the count of resources leased to a particular cluster.

func (*RedisUsageRepository) UpdateClusterQueueResourceUsage

func (r *RedisUsageRepository) UpdateClusterQueueResourceUsage(cluster string, resourceUsage *schedulerobjects.ClusterResourceUsageReport) error

type RunInfo

type RunInfo struct {
	StartTime        time.Time
	CurrentClusterId string
}

type SchedulingInfoRepository

type SchedulingInfoRepository interface {
	GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error)
	UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error
}

type StreamEventStore

type StreamEventStore struct {
	Producer              pulsar.Producer
	MaxAllowedMessageSize uint
}

func NewEventStore

func NewEventStore(producer pulsar.Producer, maxAllowedMessageSize uint) *StreamEventStore

func (*StreamEventStore) ReportEvents

func (n *StreamEventStore) ReportEvents(ctx context.Context, apiEvents []*api.EventMessage) error

type SubmitJobResult

type SubmitJobResult struct {
	JobId             string
	SubmittedJob      *api.Job
	AlreadyProcessed  bool
	DuplicateDetected bool
	Error             error
}

TODO DuplicateDetected should be remove in favour of setting the error to indicate the job already exists (e.g., by creating ErrJobExists).

type TestEventStore

type TestEventStore struct {
	ReceivedEvents []*api.EventMessage
}

func (*TestEventStore) ReportEvents

func (es *TestEventStore) ReportEvents(_ context.Context, message []*api.EventMessage) error

type UpdateJobResult

type UpdateJobResult struct {
	JobId string
	Job   *api.Job
	Error error
}

type Usage

type Usage struct {
	PriorityPerQueue     map[string]float64
	CurrentUsagePerQueue map[string]float64
}

type UsageRepository

type UsageRepository interface {
	GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)
	GetClusterPriority(clusterId string) (map[string]float64, error)
	GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)
	GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)

	GetClusterQueueResourceUsage() (map[string]*schedulerobjects.ClusterResourceUsageReport, error)

	UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error
	UpdateClusterLeased(report *api.ClusterLeasedReport) error
	UpdateClusterQueueResourceUsage(cluster string, resourceUsage *schedulerobjects.ClusterResourceUsageReport) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL