server

package
v0.3.79-rc-e26590a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2023 License: Apache-2.0 Imports: 63 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregatedQueueServer

type AggregatedQueueServer struct {

	// For storing reports of scheduling attempts.
	SchedulingContextRepository *scheduler.SchedulingContextRepository
	// Stores the most recent NodeDb for each executor.
	// Used to check if a job could ever be scheduled at job submit time.
	SubmitChecker *scheduler.SubmitChecker
	// contains filtered or unexported fields
}

func NewAggregatedQueueServer

func NewAggregatedQueueServer(
	permissions authorization.PermissionChecker,
	schedulingConfig configuration.SchedulingConfig,
	jobRepository repository.JobRepository,
	queueRepository repository.QueueRepository,
	usageRepository repository.UsageRepository,
	eventStore repository.EventStore,
	schedulingInfoRepository repository.SchedulingInfoRepository,
	pulsarProducer pulsar.Producer,
	maxPulsarMessageSize uint,
	executorRepository database.ExecutorRepository,
) *AggregatedQueueServer

func (*AggregatedQueueServer) RenewLease

func (q *AggregatedQueueServer) RenewLease(ctx context.Context, request *api.RenewLeaseRequest) (*api.IdList, error)

func (*AggregatedQueueServer) ReportDone

func (q *AggregatedQueueServer) ReportDone(ctx context.Context, idList *api.IdList) (*api.IdList, error)

func (*AggregatedQueueServer) ReturnLease

func (q *AggregatedQueueServer) ReturnLease(ctx context.Context, request *api.ReturnLeaseRequest) (*types.Empty, error)

func (*AggregatedQueueServer) StreamingLeaseJobs

StreamingLeaseJobs is called by the executor to request jobs for it to run. It streams jobs to the executor as quickly as it can and then waits to receive ids back. Only jobs for which an id was sent back are marked as leased.

This function should be used instead of the LeaseJobs function in most cases.

type CancelJobPayload added in v0.3.68

type CancelJobPayload struct {
	JobId  string
	Reason string
}

type CancelledJobPayload added in v0.3.68

type CancelledJobPayload struct {
	// contains filtered or unexported fields
}

type ErrUnauthorized

type ErrUnauthorized struct {
	// Principal that attempted the action
	Principal authorization.Principal
	// Reasons that the principal was not allowed to perform the action
	// For example ["does not own the queue and have SubmitJobs permissions", "does not have SubmitAnyJobs permissions"]
	Reasons []string
}

ErrUnauthorized represents an error that occurs when a client tries to perform some action through the gRPC API for which it does not have permissions. Produces error messages of the form "Tom" does not own the queue and have SubmitJobs permissions, "Tom" does not have SubmitAnyJobs permissions

The caller of a function that may produce this error should capture is using errors.As and prepend whatever action the principal was attempting.

func MergePermissionErrors

func MergePermissionErrors(errs ...*ErrUnauthorized) *ErrUnauthorized

func (*ErrUnauthorized) Error

func (err *ErrUnauthorized) Error() string

type EventServer

type EventServer struct {
	// contains filtered or unexported fields
}

func NewEventServer

func NewEventServer(
	permissions authorization.PermissionChecker,
	eventRepository repository.EventRepository,
	eventStore repository.EventStore,
	queueRepository repository.QueueRepository,
	jobRepository repository.JobRepository,
) *EventServer

func (*EventServer) GetJobSetEvents

func (s *EventServer) GetJobSetEvents(request *api.JobSetRequest, stream api.Event_GetJobSetEventsServer) error

GetJobSetEvents streams back all events associated with a particular job set.

func (*EventServer) Health

func (s *EventServer) Health(ctx context.Context, cont_ *types.Empty) (*api.HealthCheckResponse, error)

func (*EventServer) Report

func (s *EventServer) Report(ctx context.Context, message *api.EventMessage) (*types.Empty, error)

func (*EventServer) ReportMultiple

func (s *EventServer) ReportMultiple(ctx context.Context, message *api.EventList) (*types.Empty, error)

func (*EventServer) Watch

func (s *EventServer) Watch(req *api.WatchRequest, stream api.Event_WatchServer) error

type EventsPrinter

type EventsPrinter struct {
	Client           pulsar.Client
	Topic            string
	SubscriptionName string
	// Logger from which the loggers used by this service are derived
	// (e.g., using srv.Logger.WithField), or nil, in which case the global logrus logger is used.
	Logger *logrus.Entry
}

EventsPrinter is a service that prints all events passing through pulsar to a logger. This service is only meant for use during development; it will be slow when the number of events is large.

func (*EventsPrinter) Run

func (srv *EventsPrinter) Run(ctx context.Context) error

Run the service that reads from Pulsar and updates Armada until the provided context is cancelled.

type PulsarSubmitServer

type PulsarSubmitServer struct {
	api.UnimplementedSubmitServer
	Producer        pulsar.Producer
	Permissions     authorization.PermissionChecker
	QueueRepository repository.QueueRepository
	// Maximum size of Pulsar messages
	MaxAllowedMessageSize uint
	// Fall back to the legacy submit server for queue administration endpoints.
	SubmitServer *SubmitServer
	// Used for job submission deduplication.
	KVStore *pgkeyvalue.PGKeyValueStore
	// Used to check at job submit time if the job could ever be scheduled on either legacy or pulsar schedulers
	PulsarSchedulerSubmitChecker *scheduler.SubmitChecker
	LegacySchedulerSubmitChecker *scheduler.SubmitChecker
	// Flag to control if we enable sending messages to the pulsar scheduler
	PulsarSchedulerEnabled bool
	// Probability of using the pulsar scheduler.  Has no effect if PulsarSchedulerEnabled is false
	ProbabilityOfUsingPulsarScheduler float64
	// Used to assign a job to either legacy or pulsar schedulers. Injected here to allow repeatable tests
	Rand *rand.Rand
	// Gang id annotation. Needed because we cannot split a gang across schedulers.
	GangIdAnnotation string
	// Temporary flag to stop us rejecting jobs as we switch over to new submit checks
	IgnoreJobSubmitChecks bool
}

PulsarSubmitServer is a service that accepts API calls according to the original Armada submit API and publishes messages to Pulsar based on those calls. TODO: Consider returning a list of message ids of the messages generated TODO: Include job set as the message key for each message

func (*PulsarSubmitServer) Authorize

func (srv *PulsarSubmitServer) Authorize(
	ctx context.Context,
	queueName string,
	anyPerm permission.Permission,
	perm queue.PermissionVerb,
) (userId string, groups []string, err error)

Authorize authorises a user request to submit a state transition message to the log. User information used for authorization is extracted from the provided context. Checks that the user has either anyPerm (e.g., permissions.SubmitAnyJobs) or perm (e.g., PermissionVerbSubmit) for this queue. Returns the userId and groups extracted from the context.

func (*PulsarSubmitServer) CancelJobSet

func (srv *PulsarSubmitServer) CancelJobSet(ctx context.Context, req *api.JobSetCancelRequest) (*types.Empty, error)

func (*PulsarSubmitServer) CancelJobs

func (*PulsarSubmitServer) CreateQueue

func (srv *PulsarSubmitServer) CreateQueue(ctx context.Context, req *api.Queue) (*types.Empty, error)

Fallback methods. Calls into an embedded server.SubmitServer.

func (*PulsarSubmitServer) CreateQueues

func (*PulsarSubmitServer) DeleteQueue

func (srv *PulsarSubmitServer) DeleteQueue(ctx context.Context, req *api.QueueDeleteRequest) (*types.Empty, error)

func (*PulsarSubmitServer) GetQueue

func (srv *PulsarSubmitServer) GetQueue(ctx context.Context, req *api.QueueGetRequest) (*api.Queue, error)

func (*PulsarSubmitServer) GetQueueInfo

func (srv *PulsarSubmitServer) GetQueueInfo(ctx context.Context, req *api.QueueInfoRequest) (*api.QueueInfo, error)

func (*PulsarSubmitServer) ReprioritizeJobs

func (*PulsarSubmitServer) SubmitJobs

func (*PulsarSubmitServer) UpdateQueue

func (srv *PulsarSubmitServer) UpdateQueue(ctx context.Context, req *api.Queue) (*types.Empty, error)

func (*PulsarSubmitServer) UpdateQueues

type SchedulerJobRepositoryAdapter added in v0.3.50

type SchedulerJobRepositoryAdapter struct {
	// contains filtered or unexported fields
}

func (*SchedulerJobRepositoryAdapter) GetExistingJobsByIds added in v0.3.50

func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) ([]schedulerinterfaces.LegacySchedulerJob, error)

func (*SchedulerJobRepositoryAdapter) GetQueueJobIds added in v0.3.50

func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) ([]string, error)

type SubmitFromLog

type SubmitFromLog struct {
	SubmitServer *SubmitServer
	Consumer     pulsar.Consumer
	// Logger from which the loggers used by this service are derived
	// (e.g., using srv.Logger.WithField), or nil, in which case the global logrus logger is used.
	Logger *logrus.Entry
}

SubmitFromLog is a service that reads messages from Pulsar and updates the state of the Armada server accordingly (in particular, it writes to Redis). Calls into an embedded Armada submit server object.

func (*SubmitFromLog) BatchedCancelJobsById

func (srv *SubmitFromLog) BatchedCancelJobsById(ctx context.Context, userId string, cancelJobPayloads []*CancelJobPayload) (bool, error)

func (*SubmitFromLog) CancelJobSet

func (srv *SubmitFromLog) CancelJobSet(ctx context.Context, userId string, queueName string, jobSetName string, reason string) (bool, error)

func (*SubmitFromLog) CancelJobSets

func (srv *SubmitFromLog) CancelJobSets(
	ctx context.Context,
	userId string,
	queueName string,
	jobSetName string,
	events []*armadaevents.CancelJobSet,
) (bool, error)

CancelJobSets processes several CancelJobSet events. Because event sequences are specific to queue and job set, all CancelJobSet events in a sequence are equivalent, and we only need to call CancelJobSet once.

func (*SubmitFromLog) CancelJobs

func (srv *SubmitFromLog) CancelJobs(ctx context.Context, userId string, es []*armadaevents.CancelJob) (bool, error)

CancelJobs cancels all jobs specified by the provided events in a single operation.

func (*SubmitFromLog) CancelJobsById

func (srv *SubmitFromLog) CancelJobsById(ctx context.Context, userId string, cancelJobPayloads []*CancelJobPayload) ([]string, error)

CancelJobsById cancels all jobs with the specified ids.

func (*SubmitFromLog) DeleteFailedJobs added in v0.3.53

func (srv *SubmitFromLog) DeleteFailedJobs(ctx context.Context, es []*armadaevents.EventSequence_Event) (bool, error)

func (*SubmitFromLog) ProcessSequence

func (srv *SubmitFromLog) ProcessSequence(ctx context.Context, sequence *armadaevents.EventSequence) bool

ProcessSequence processes all events in a particular sequence. For efficiency, we may process several events at a time. To maintain ordering, we only do so for subsequences of consecutive events of equal type. The returned bool indicates if the corresponding Pulsar message should be ack'd or not.

func (*SubmitFromLog) ProcessSubSequence

func (srv *SubmitFromLog) ProcessSubSequence(ctx context.Context, i int, sequence *armadaevents.EventSequence) (j int, err error)

ProcessSubSequence processes sequence.Events[i:j-1], where j is the index of the first event in the sequence of a type different from that of sequence.Events[i], or len(sequence.Events) if no such event exists in the sequence, and returns j.

Processing one such subsequence at a time preserves ordering between events of different types. For example, SubmitJob events are processed before CancelJob events that occur later in the sequence.

Events are processed by calling into the embedded srv.SubmitServer.

Not all events are handled by this processor since the legacy scheduler writes some transitions directly to the db.

func (*SubmitFromLog) ReprioritizeJobSet

func (srv *SubmitFromLog) ReprioritizeJobSet(
	ctx context.Context,
	userId string,
	queueName string,
	jobSetName string,
	e *armadaevents.ReprioritiseJobSet,
) (bool, error)

func (*SubmitFromLog) ReprioritizeJobSets

func (srv *SubmitFromLog) ReprioritizeJobSets(
	ctx context.Context,
	userId string,
	queueName string,
	jobSetName string,
	es []*armadaevents.ReprioritiseJobSet,
) (bool, error)

ReprioritizeJobSets updates the priority of several job sets. Returns a multierror containing all errors that occurred. Since repeating this operation is safe (setting the priority is idempotent), the bool indicating if events were processed is set to false if any job set failed.

func (*SubmitFromLog) ReprioritizeJobs

func (srv *SubmitFromLog) ReprioritizeJobs(ctx context.Context, userId string, es []*armadaevents.ReprioritiseJob) (bool, error)

ReprioritizeJobs updates the priority of one of more jobs.

func (*SubmitFromLog) Run

func (srv *SubmitFromLog) Run(ctx context.Context) error

Run the service that reads from Pulsar and updates Armada until the provided context is cancelled.

func (*SubmitFromLog) SubmitJobs

func (srv *SubmitFromLog) SubmitJobs(
	ctx context.Context,
	userId string,
	groups []string,
	queueName string,
	jobSetName string,
	es []*armadaevents.SubmitJob,
) (bool, error)

SubmitJobs processes several job submit events in bulk. It returns a boolean indicating if the events were processed and any error that occurred during processing. Specifically, events are not processed if writing to the database results in a network-related error. For any other error, the jobs are marked as failed and the events are considered to have been processed.

func (*SubmitFromLog) UpdateJobStartTimes

func (srv *SubmitFromLog) UpdateJobStartTimes(ctx context.Context, es []*armadaevents.EventSequence_Event) (bool, error)

UpdateJobStartTimes records the start time (in Redis) of one of more jobs.

type SubmitServer

type SubmitServer struct {
	// contains filtered or unexported fields
}

func NewSubmitServer

func NewSubmitServer(
	permissions authorization.PermissionChecker,
	jobRepository repository.JobRepository,
	queueRepository repository.QueueRepository,
	eventStore repository.EventStore,
	schedulingInfoRepository repository.SchedulingInfoRepository,
	cancelJobsBatchSize int,
	queueManagementConfig *configuration.QueueManagementConfig,
	schedulingConfig *configuration.SchedulingConfig,
) *SubmitServer

func (*SubmitServer) CancelJobSet

func (server *SubmitServer) CancelJobSet(ctx context.Context, request *api.JobSetCancelRequest) (*types.Empty, error)

func (*SubmitServer) CancelJobs

func (server *SubmitServer) CancelJobs(ctx context.Context, request *api.JobCancelRequest) (*api.CancellationResult, error)

CancelJobs cancels jobs identified by the request. If the request contains a job ID, only the job with that ID is cancelled. If the request contains a queue name and a job set ID, all jobs matching those are cancelled.

func (*SubmitServer) CreateQueue

func (server *SubmitServer) CreateQueue(ctx context.Context, request *api.Queue) (*types.Empty, error)

func (*SubmitServer) CreateQueues

func (server *SubmitServer) CreateQueues(ctx context.Context, request *api.QueueList) (*api.BatchQueueCreateResponse, error)

func (*SubmitServer) DeleteQueue

func (server *SubmitServer) DeleteQueue(ctx context.Context, request *api.QueueDeleteRequest) (*types.Empty, error)

func (*SubmitServer) GetQueue

func (server *SubmitServer) GetQueue(ctx context.Context, req *api.QueueGetRequest) (*api.Queue, error)

func (*SubmitServer) GetQueueInfo

func (server *SubmitServer) GetQueueInfo(ctx context.Context, req *api.QueueInfoRequest) (*api.QueueInfo, error)

func (*SubmitServer) Health

func (server *SubmitServer) Health(ctx context.Context, _ *types.Empty) (*api.HealthCheckResponse, error)

func (*SubmitServer) ReprioritizeJobs

func (server *SubmitServer) ReprioritizeJobs(ctx context.Context, request *api.JobReprioritizeRequest) (*api.JobReprioritizeResponse, error)

ReprioritizeJobs updates the priority of one of more jobs. Returns a map from job ID to any error (or nil if the call succeeded).

func (*SubmitServer) SubmitJobs

func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error)

func (*SubmitServer) UpdateQueue

func (server *SubmitServer) UpdateQueue(ctx context.Context, request *api.Queue) (*types.Empty, error)

func (*SubmitServer) UpdateQueues

func (server *SubmitServer) UpdateQueues(ctx context.Context, request *api.QueueList) (*api.BatchQueueUpdateResponse, error)

type UsageServer

type UsageServer struct {
	// contains filtered or unexported fields
}

func NewUsageServer

func NewUsageServer(
	permissions authorization.PermissionChecker,
	priorityHalfTime time.Duration,
	schedulingConfig *configuration.SchedulingConfig,
	usageRepository repository.UsageRepository,
	queueRepository repository.QueueRepository,
) *UsageServer

func (*UsageServer) ReportUsage

func (s *UsageServer) ReportUsage(ctx context.Context, report *api.ClusterUsageReport) (*types.Empty, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL