repository

package
v0.2.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2021 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const JobNotFound = "no job found with provided Id"

Variables

View Source
var ErrQueueAlreadyExists = errors.New("Queue already exists")
View Source
var ErrQueueNotFound = errors.New("Queue does not exist")

Functions

This section is empty.

Types

type EventJobStatusProcessor added in v0.2.11

type EventJobStatusProcessor struct {
	// contains filtered or unexported fields
}

func NewEventJobStatusProcessor added in v0.2.11

func NewEventJobStatusProcessor(stream eventstream.EventStream, queue string, jobRepository JobRepository) *EventJobStatusProcessor

func (*EventJobStatusProcessor) Start added in v0.2.11

func (p *EventJobStatusProcessor) Start()

type EventRepository

type EventRepository interface {
	ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)
	GetLastMessageId(queue, jobSetId string) (string, error)
}

type EventStore added in v0.1.6

type EventStore interface {
	ReportEvents(message []*api.EventMessage) error
}

type JobRepository

type JobRepository interface {
	PeekQueue(queue string, limit int64) ([]*api.Job, error)
	TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error)
	AddJobs(job []*api.Job) ([]*SubmitJobResult, error)
	GetExistingJobsByIds(ids []string) ([]*api.Job, error)
	FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)
	GetQueueSizes(queues []*api.Queue) (sizes []int64, e error)
	IterateQueueJobs(queueName string, action func(*api.Job)) error
	GetQueueJobIds(queueName string) ([]string, error)
	RenewLease(clusterId string, jobIds []string) (renewed []string, e error)
	ExpireLeases(queue string, deadline time.Time) (expired []*api.Job, e error)
	ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)
	DeleteJobs(jobs []*api.Job) map[*api.Job]error
	GetActiveJobIds(queue string, jobSetId string) ([]string, error)
	GetLeasedJobIds(queue string) ([]string, error)
	UpdateStartTime(jobId string, clusterId string, startTime time.Time) error
	UpdateJobs(ids []string, mutator func([]*api.Job)) []UpdateJobResult
	GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)
	GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)
	AddRetryAttempt(jobId string) error
	GetNumberOfRetryAttempts(jobId string) (int, error)
}

type QueueRepository

type QueueRepository interface {
	GetAllQueues() ([]*api.Queue, error)
	GetQueue(name string) (*api.Queue, error)
	CreateQueue(queue *api.Queue) error
	UpdateQueue(queue *api.Queue) error
	DeleteQueue(name string) error
}

type RedisEventProcessor added in v0.2.11

type RedisEventProcessor struct {
	// contains filtered or unexported fields
}

func NewEventRedisProcessor added in v0.2.11

func NewEventRedisProcessor(stream eventstream.EventStream, queue string, repository EventStore) *RedisEventProcessor

func (*RedisEventProcessor) Start added in v0.2.11

func (p *RedisEventProcessor) Start()

type RedisEventRepository

type RedisEventRepository struct {
	// contains filtered or unexported fields
}

func (*RedisEventRepository) GetLastMessageId added in v0.0.5

func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error)

func (*RedisEventRepository) ReadEvents

func (repo *RedisEventRepository) ReadEvents(queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)

func (*RedisEventRepository) ReportEvent

func (repo *RedisEventRepository) ReportEvent(message *api.EventMessage) error

func (*RedisEventRepository) ReportEvents

func (repo *RedisEventRepository) ReportEvents(messages []*api.EventMessage) error

type RedisHealth added in v0.2.3

type RedisHealth struct {
	// contains filtered or unexported fields
}

func NewRedisHealth added in v0.2.3

func NewRedisHealth(db redis.UniversalClient) *RedisHealth

func (*RedisHealth) Check added in v0.2.3

func (r *RedisHealth) Check() error

type RedisJobRepository

type RedisJobRepository struct {
	// contains filtered or unexported fields
}

func (*RedisJobRepository) AddJobs added in v0.0.3

func (repo *RedisJobRepository) AddJobs(jobs []*api.Job) ([]*SubmitJobResult, error)

func (*RedisJobRepository) AddRetryAttempt added in v0.1.16

func (repo *RedisJobRepository) AddRetryAttempt(jobId string) error

func (*RedisJobRepository) DeleteJobs added in v0.0.10

func (repo *RedisJobRepository) DeleteJobs(jobs []*api.Job) map[*api.Job]error

func (*RedisJobRepository) ExpireLeases

func (repo *RedisJobRepository) ExpireLeases(queue string, deadline time.Time) ([]*api.Job, error)

func (*RedisJobRepository) FilterActiveQueues

func (repo *RedisJobRepository) FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)

func (*RedisJobRepository) GetActiveJobIds

func (repo *RedisJobRepository) GetActiveJobIds(queue string, jobSetId string) ([]string, error)

func (*RedisJobRepository) GetExistingJobsByIds added in v0.0.10

func (repo *RedisJobRepository) GetExistingJobsByIds(ids []string) ([]*api.Job, error)

Returns existing jobs by Id If an Id is supplied that no longer exists, that job will simply be omitted from the result. No error will be thrown for missing jobs

func (*RedisJobRepository) GetJobRunInfos added in v0.1.33

func (repo *RedisJobRepository) GetJobRunInfos(jobIds []string) (map[string]*RunInfo, error)

Returns the run info of each job id for the cluster they are currently associated with (leased by) Jobs with no value will be omitted from the results, which happens in the following cases: - The job is not associated with a cluster - The job has does not have a start time for the cluster it is associated with

func (*RedisJobRepository) GetLeasedJobIds added in v0.1.32

func (repo *RedisJobRepository) GetLeasedJobIds(queue string) ([]string, error)

func (*RedisJobRepository) GetNumberOfRetryAttempts added in v0.1.16

func (repo *RedisJobRepository) GetNumberOfRetryAttempts(jobId string) (int, error)

func (*RedisJobRepository) GetQueueActiveJobSets added in v0.1.2

func (repo *RedisJobRepository) GetQueueActiveJobSets(queue string) ([]*api.JobSetInfo, error)

func (*RedisJobRepository) GetQueueJobIds added in v0.1.26

func (repo *RedisJobRepository) GetQueueJobIds(queueName string) ([]string, error)

func (*RedisJobRepository) GetQueueSizes

func (repo *RedisJobRepository) GetQueueSizes(queues []*api.Queue) (sizes []int64, err error)

func (*RedisJobRepository) IterateQueueJobs added in v0.1.24

func (repo *RedisJobRepository) IterateQueueJobs(queueName string, action func(*api.Job)) error

func (*RedisJobRepository) PeekQueue

func (repo *RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job, error)

func (*RedisJobRepository) RenewLease

func (repo *RedisJobRepository) RenewLease(clusterId string, jobIds []string) (renewedJobIds []string, e error)

func (*RedisJobRepository) ReturnLease

func (repo *RedisJobRepository) ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)

func (*RedisJobRepository) TryLeaseJobs

func (repo *RedisJobRepository) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error)

returns list of jobs which are successfully leased

func (*RedisJobRepository) UpdateJobs added in v0.2.6

func (repo *RedisJobRepository) UpdateJobs(ids []string, mutator func([]*api.Job)) []UpdateJobResult

func (*RedisJobRepository) UpdateStartTime added in v0.1.32

func (repo *RedisJobRepository) UpdateStartTime(jobId string, clusterId string, startTime time.Time) error

type RedisQueueRepository

type RedisQueueRepository struct {
	// contains filtered or unexported fields
}

func NewRedisQueueRepository

func NewRedisQueueRepository(db redis.UniversalClient) *RedisQueueRepository

func (*RedisQueueRepository) CreateQueue

func (r *RedisQueueRepository) CreateQueue(queue *api.Queue) error

func (*RedisQueueRepository) DeleteQueue added in v0.1.14

func (r *RedisQueueRepository) DeleteQueue(name string) error

func (*RedisQueueRepository) GetAllQueues

func (r *RedisQueueRepository) GetAllQueues() ([]*api.Queue, error)

func (*RedisQueueRepository) GetQueue

func (r *RedisQueueRepository) GetQueue(name string) (*api.Queue, error)

func (*RedisQueueRepository) UpdateQueue added in v0.2.3

func (r *RedisQueueRepository) UpdateQueue(queue *api.Queue) error

type RedisSchedulingInfoRepository added in v0.1.6

type RedisSchedulingInfoRepository struct {
	// contains filtered or unexported fields
}

func NewRedisSchedulingInfoRepository added in v0.1.6

func NewRedisSchedulingInfoRepository(db redis.UniversalClient) *RedisSchedulingInfoRepository

func (*RedisSchedulingInfoRepository) GetClusterSchedulingInfo added in v0.1.6

func (r *RedisSchedulingInfoRepository) GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error)

func (*RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo added in v0.1.6

func (r *RedisSchedulingInfoRepository) UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error

type RedisUsageRepository

type RedisUsageRepository struct {
	// contains filtered or unexported fields
}

func NewRedisUsageRepository

func NewRedisUsageRepository(db redis.UniversalClient) *RedisUsageRepository

func (*RedisUsageRepository) GetClusterLeasedReports added in v0.1.6

func (r *RedisUsageRepository) GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)

func (*RedisUsageRepository) GetClusterPriorities

func (r *RedisUsageRepository) GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)

func (*RedisUsageRepository) GetClusterPriority

func (r *RedisUsageRepository) GetClusterPriority(clusterId string) (map[string]float64, error)

func (*RedisUsageRepository) GetClusterUsageReports

func (r *RedisUsageRepository) GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)

func (*RedisUsageRepository) UpdateCluster

func (r *RedisUsageRepository) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error

func (*RedisUsageRepository) UpdateClusterLeased added in v0.1.6

func (r *RedisUsageRepository) UpdateClusterLeased(report *api.ClusterLeasedReport) error

type RunInfo added in v0.1.33

type RunInfo struct {
	StartTime        time.Time
	CurrentClusterId string
}

type SchedulingInfoRepository added in v0.1.6

type SchedulingInfoRepository interface {
	GetClusterSchedulingInfo() (map[string]*api.ClusterSchedulingInfoReport, error)
	UpdateClusterSchedulingInfo(report *api.ClusterSchedulingInfoReport) error
}

type StreamEventStore added in v0.2.11

type StreamEventStore struct {
	// contains filtered or unexported fields
}

func NewEventStore added in v0.2.11

func NewEventStore(stream eventstream.EventStream) *StreamEventStore

func (*StreamEventStore) ReportEvents added in v0.2.11

func (n *StreamEventStore) ReportEvents(messages []*api.EventMessage) error

type SubmitJobResult added in v0.0.3

type SubmitJobResult struct {
	JobId             string
	SubmittedJob      *api.Job
	DuplicateDetected bool
	Error             error
}

type UpdateJobResult added in v0.2.8

type UpdateJobResult struct {
	JobId string
	Job   *api.Job
	Error error
}

type Usage

type Usage struct {
	PriorityPerQueue     map[string]float64
	CurrentUsagePerQueue map[string]float64
}

type UsageRepository

type UsageRepository interface {
	GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)
	GetClusterPriority(clusterId string) (map[string]float64, error)
	GetClusterPriorities(clusterIds []string) (map[string]map[string]float64, error)
	GetClusterLeasedReports() (map[string]*api.ClusterLeasedReport, error)

	UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error
	UpdateClusterLeased(report *api.ClusterLeasedReport) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL