server

package
v0.4.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2024 License: Apache-2.0 Imports: 67 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActionAuthorizer added in v0.4.5

type ActionAuthorizer interface {
	AuthorizeAction(ctx *armadacontext.Context, perm permission.Permission) error
	AuthorizeQueueAction(ctx *armadacontext.Context, queue queue.Queue, anyPerm permission.Permission, perm queue.PermissionVerb) error
}

type AggregatedQueueServer

type AggregatedQueueServer struct {

	// For storing reports of scheduling attempts.
	SchedulingContextRepository *scheduler.SchedulingContextRepository
	// Stores the most recent NodeDb for each executor.
	// Used to check if a job could ever be scheduled at job submit time.
	SubmitChecker *scheduler.SubmitChecker
	// contains filtered or unexported fields
}

func NewAggregatedQueueServer

func NewAggregatedQueueServer(
	authorizer ActionAuthorizer,
	schedulingConfig configuration.SchedulingConfig,
	jobRepository repository.JobRepository,
	queueRepository repository.QueueRepository,
	usageRepository repository.UsageRepository,
	eventStore repository.EventStore,
	schedulingInfoRepository repository.SchedulingInfoRepository,
	pulsarProducer pulsar.Producer,
	maxPulsarMessageSize uint,
	executorRepository database.ExecutorRepository,
) *AggregatedQueueServer

func (*AggregatedQueueServer) RenewLease

func (q *AggregatedQueueServer) RenewLease(grpcCtx context.Context, request *api.RenewLeaseRequest) (*api.IdList, error)

func (*AggregatedQueueServer) ReportDone

func (q *AggregatedQueueServer) ReportDone(grpcCtx context.Context, idList *api.IdList) (*api.IdList, error)

func (*AggregatedQueueServer) ReturnLease

func (q *AggregatedQueueServer) ReturnLease(grpcCtx context.Context, request *api.ReturnLeaseRequest) (*prototypes.Empty, error)

func (*AggregatedQueueServer) StreamingLeaseJobs

StreamingLeaseJobs is called by the executor to request jobs for it to run. It streams jobs to the executor as quickly as it can and then waits to receive ids back. Only jobs for which an id was sent back are marked as leased.

This function should be used instead of the LeaseJobs function in most cases.

type Authorizer added in v0.4.5

type Authorizer struct {
	// contains filtered or unexported fields
}

func NewAuthorizer added in v0.4.5

func NewAuthorizer(permissionChecker authorization.PermissionChecker) *Authorizer

func (*Authorizer) AuthorizeAction added in v0.4.5

func (b *Authorizer) AuthorizeAction(ctx *armadacontext.Context, perm permission.Permission) error

func (*Authorizer) AuthorizeQueueAction added in v0.4.5

func (b *Authorizer) AuthorizeQueueAction(
	ctx *armadacontext.Context,
	queue queue.Queue,
	anyPerm permission.Permission,
	perm queue.PermissionVerb,
) error

type CancelJobPayload added in v0.3.68

type CancelJobPayload struct {
	JobId  string
	Reason string
}

type CancelledJobPayload added in v0.3.68

type CancelledJobPayload struct {
	// contains filtered or unexported fields
}

type EventServer

type EventServer struct {
	// contains filtered or unexported fields
}

func NewEventServer

func NewEventServer(
	authorizer ActionAuthorizer,
	eventRepository repository.EventRepository,
	eventStore repository.EventStore,
	queueRepository repository.QueueRepository,
	jobRepository repository.JobRepository,
) *EventServer

func (*EventServer) GetJobSetEvents

func (s *EventServer) GetJobSetEvents(request *api.JobSetRequest, stream api.Event_GetJobSetEventsServer) error

GetJobSetEvents streams back all events associated with a particular job set.

func (*EventServer) Health

func (*EventServer) Report

func (s *EventServer) Report(grpcCtx context.Context, message *api.EventMessage) (*types.Empty, error)

func (*EventServer) ReportMultiple

func (s *EventServer) ReportMultiple(grpcCtx context.Context, message *api.EventList) (*types.Empty, error)

func (*EventServer) Watch

func (s *EventServer) Watch(req *api.WatchRequest, stream api.Event_WatchServer) error

type EventsPrinter

type EventsPrinter struct {
	Client           pulsar.Client
	Topic            string
	SubscriptionName string
	// Logger from which the loggers used by this service are derived
	// (e.g., using srv.Logger.WithField), or nil, in which case the global logrus logger is used.
	Logger *logrus.Entry
}

EventsPrinter is a service that prints all events passing through pulsar to a logger. This service is only meant for use during development; it will be slow when the number of events is large.

func (*EventsPrinter) Run

func (srv *EventsPrinter) Run(ctx *armadacontext.Context) error

Run the service that reads from Pulsar and updates Armada until the provided context is cancelled.

type JobSubmitError added in v0.4.8

type JobSubmitError struct {
	JobErrorsDetails []*api.JobSubmitResponseItem
	Err              error
}

func (*JobSubmitError) Error added in v0.4.8

func (e *JobSubmitError) Error() string

type PulsarSubmitServer

type PulsarSubmitServer struct {
	api.UnimplementedSubmitServer
	Producer        pulsar.Producer
	QueueRepository repository.QueueRepository
	// Maximum size of Pulsar messages
	MaxAllowedMessageSize uint
	// Fall back to the legacy submit server for queue administration endpoints.
	SubmitServer *SubmitServer
	// Used for job submission deduplication.
	KVStore *pgkeyvalue.PGKeyValueStore
	// Used to check at job submit time if the job could ever be scheduled on either legacy or pulsar schedulers
	PulsarSchedulerSubmitChecker *scheduler.SubmitChecker
	LegacySchedulerSubmitChecker *scheduler.SubmitChecker
	// Flag to control if we enable sending messages to the pulsar scheduler
	PulsarSchedulerEnabled bool
	// Probability of using the pulsar scheduler.  Has no effect if PulsarSchedulerEnabled is false
	ProbabilityOfUsingPulsarScheduler float64
	// Used to assign a job to either legacy or pulsar schedulers. Injected here to allow repeatable tests
	Rand *rand.Rand
	// Gang id annotation. Needed because we cannot split a gang across schedulers.
	GangIdAnnotation string
	// Temporary flag to stop us rejecting jobs as we switch over to new submit checks
	IgnoreJobSubmitChecks bool
}

PulsarSubmitServer is a service that accepts API calls according to the original Armada submit API and publishes messages to Pulsar based on those calls. TODO: Consider returning a list of message ids of the messages generated TODO: Include job set as the message key for each message

func (*PulsarSubmitServer) Authorize

func (srv *PulsarSubmitServer) Authorize(
	ctx *armadacontext.Context,
	queueName string,
	anyPerm permission.Permission,
	perm queue.PermissionVerb,
) (string, []string, error)

Authorize authorises a user request to submit a state transition message to the log. User information used for authorization is extracted from the provided context. Checks that the user has either anyPerm (e.g., permissions.SubmitAnyJobs) or perm (e.g., PermissionVerbSubmit) for this queue. Returns the userId and groups extracted from the context.

func (*PulsarSubmitServer) CancelJobSet

func (srv *PulsarSubmitServer) CancelJobSet(grpcCtx context.Context, req *api.JobSetCancelRequest) (*types.Empty, error)

func (*PulsarSubmitServer) CancelJobs

func (*PulsarSubmitServer) CreateQueue

func (srv *PulsarSubmitServer) CreateQueue(ctx context.Context, req *api.Queue) (*types.Empty, error)

Fallback methods. Calls into an embedded server.SubmitServer.

func (*PulsarSubmitServer) CreateQueues

func (*PulsarSubmitServer) DeleteQueue

func (srv *PulsarSubmitServer) DeleteQueue(ctx context.Context, req *api.QueueDeleteRequest) (*types.Empty, error)

func (*PulsarSubmitServer) GetQueue

func (srv *PulsarSubmitServer) GetQueue(ctx context.Context, req *api.QueueGetRequest) (*api.Queue, error)

func (*PulsarSubmitServer) GetQueueInfo

func (srv *PulsarSubmitServer) GetQueueInfo(ctx context.Context, req *api.QueueInfoRequest) (*api.QueueInfo, error)

func (*PulsarSubmitServer) GetQueues added in v0.3.103

func (*PulsarSubmitServer) GetUser added in v0.4.10

func (srv *PulsarSubmitServer) GetUser(ctx *armadacontext.Context) string

func (*PulsarSubmitServer) ReprioritizeJobs

func (*PulsarSubmitServer) SubmitJobs

func (srv *PulsarSubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error)

func (*PulsarSubmitServer) UpdateQueue

func (srv *PulsarSubmitServer) UpdateQueue(ctx context.Context, req *api.Queue) (*types.Empty, error)

func (*PulsarSubmitServer) UpdateQueues

type SchedulerJobRepositoryAdapter added in v0.3.50

type SchedulerJobRepositoryAdapter struct {
	// contains filtered or unexported fields
}

func (*SchedulerJobRepositoryAdapter) GetExistingJobsByIds added in v0.3.50

func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) ([]schedulerinterfaces.LegacySchedulerJob, error)

func (*SchedulerJobRepositoryAdapter) GetQueueJobIds added in v0.3.50

func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) ([]string, error)

type SubmitFromLog

type SubmitFromLog struct {
	SubmitServer *SubmitServer
	Consumer     pulsar.Consumer
	// Logger from which the loggers used by this service are derived
	// (e.g., using srv.Logger.WithField), or nil, in which case the global logrus logger is used.
	Logger *logrus.Entry
}

SubmitFromLog is a service that reads messages from Pulsar and updates the state of the Armada server accordingly (in particular, it writes to Redis). Calls into an embedded Armada submit server object.

func (*SubmitFromLog) BatchedCancelJobsById

func (srv *SubmitFromLog) BatchedCancelJobsById(ctx *armadacontext.Context, userId string, cancelJobPayloads []*CancelJobPayload) (bool, error)

func (*SubmitFromLog) CancelJobSet

func (srv *SubmitFromLog) CancelJobSet(ctx *armadacontext.Context, userId string, queueName string, jobSetName string, reason string) (bool, error)

func (*SubmitFromLog) CancelJobSets

func (srv *SubmitFromLog) CancelJobSets(
	ctx *armadacontext.Context,
	userId string,
	queueName string,
	jobSetName string,
	events []*armadaevents.CancelJobSet,
) (bool, error)

CancelJobSets processes several CancelJobSet events. Because event sequences are specific to queue and job set, all CancelJobSet events in a sequence are equivalent, and we only need to call CancelJobSet once.

func (*SubmitFromLog) CancelJobs

func (srv *SubmitFromLog) CancelJobs(ctx *armadacontext.Context, userId string, es []*armadaevents.CancelJob) (bool, error)

CancelJobs cancels all jobs specified by the provided events in a single operation.

func (*SubmitFromLog) CancelJobsById

func (srv *SubmitFromLog) CancelJobsById(ctx *armadacontext.Context, userId string, cancelJobPayloads []*CancelJobPayload) ([]string, error)

CancelJobsById cancels all jobs with the specified ids.

func (*SubmitFromLog) DeleteFailedJobs added in v0.3.53

func (srv *SubmitFromLog) DeleteFailedJobs(ctx *armadacontext.Context, es []*armadaevents.EventSequence_Event) (bool, error)

func (*SubmitFromLog) ProcessSequence

func (srv *SubmitFromLog) ProcessSequence(ctx *armadacontext.Context, sequence *armadaevents.EventSequence) bool

ProcessSequence processes all events in a particular sequence. For efficiency, we may process several events at a time. To maintain ordering, we only do so for subsequences of consecutive events of equal type. The returned bool indicates if the corresponding Pulsar message should be ack'd or not.

func (*SubmitFromLog) ProcessSubSequence

func (srv *SubmitFromLog) ProcessSubSequence(ctx *armadacontext.Context, i int, sequence *armadaevents.EventSequence) (j int, err error)

ProcessSubSequence processes sequence.Events[i:j-1], where j is the index of the first event in the sequence of a type different from that of sequence.Events[i], or len(sequence.Events) if no such event exists in the sequence, and returns j.

Processing one such subsequence at a time preserves ordering between events of different types. For example, SubmitJob events are processed before CancelJob events that occur later in the sequence.

Events are processed by calling into the embedded srv.SubmitServer.

Not all events are handled by this processor since the legacy scheduler writes some transitions directly to the db.

func (*SubmitFromLog) ReprioritizeJobSet

func (srv *SubmitFromLog) ReprioritizeJobSet(
	ctx *armadacontext.Context,
	userId string,
	queueName string,
	jobSetName string,
	e *armadaevents.ReprioritiseJobSet,
) (bool, error)

func (*SubmitFromLog) ReprioritizeJobSets

func (srv *SubmitFromLog) ReprioritizeJobSets(
	ctx *armadacontext.Context,
	userId string,
	queueName string,
	jobSetName string,
	es []*armadaevents.ReprioritiseJobSet,
) (bool, error)

ReprioritizeJobSets updates the priority of several job sets. Returns a multierror containing all errors that occurred. Since repeating this operation is safe (setting the priority is idempotent), the bool indicating if events were processed is set to false if any job set failed.

func (*SubmitFromLog) ReprioritizeJobs

func (srv *SubmitFromLog) ReprioritizeJobs(ctx *armadacontext.Context, userId string, es []*armadaevents.ReprioritiseJob) (bool, error)

ReprioritizeJobs updates the priority of one of more jobs.

func (*SubmitFromLog) Run

func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error

Run the service that reads from Pulsar and updates Armada until the provided context is cancelled.

func (*SubmitFromLog) SubmitJobs

func (srv *SubmitFromLog) SubmitJobs(
	ctx *armadacontext.Context,
	userId string,
	groups []string,
	queueName string,
	jobSetName string,
	es []*armadaevents.SubmitJob,
) (bool, error)

SubmitJobs processes several job submit events in bulk. It returns a boolean indicating if the events were processed and any error that occurred during processing. Specifically, events are not processed if writing to the database results in a network-related error. For any other error, the jobs are marked as failed and the events are considered to have been processed.

func (*SubmitFromLog) UpdateJobStartTimes

func (srv *SubmitFromLog) UpdateJobStartTimes(ctx *armadacontext.Context, es []*armadaevents.EventSequence_Event) (bool, error)

UpdateJobStartTimes records the start time (in Redis) of one of more jobs.

type SubmitServer

type SubmitServer struct {
	// contains filtered or unexported fields
}

func NewSubmitServer

func NewSubmitServer(
	authorizer ActionAuthorizer,
	jobRepository repository.JobRepository,
	queueRepository repository.QueueRepository,
	eventStore repository.EventStore,
	schedulingInfoRepository repository.SchedulingInfoRepository,
	cancelJobsBatchSize int,
	queueManagementConfig *configuration.QueueManagementConfig,
	schedulingConfig *configuration.SchedulingConfig,
) *SubmitServer

func (*SubmitServer) CancelJobSet

func (server *SubmitServer) CancelJobSet(grpcCtx context.Context, request *api.JobSetCancelRequest) (*types.Empty, error)

func (*SubmitServer) CancelJobs

func (server *SubmitServer) CancelJobs(grpcCtx context.Context, request *api.JobCancelRequest) (*api.CancellationResult, error)

CancelJobs cancels jobs identified by the request. If the request contains a job ID, only the job with that ID is cancelled. If the request contains a queue name and a job set ID, all jobs matching those are cancelled.

func (*SubmitServer) CreateQueue

func (server *SubmitServer) CreateQueue(grpcCtx context.Context, request *api.Queue) (*types.Empty, error)

func (*SubmitServer) CreateQueues

func (server *SubmitServer) CreateQueues(grpcCtx context.Context, request *api.QueueList) (*api.BatchQueueCreateResponse, error)

func (*SubmitServer) DeleteQueue

func (server *SubmitServer) DeleteQueue(grpcCtx context.Context, request *api.QueueDeleteRequest) (*types.Empty, error)

func (*SubmitServer) GetQueue

func (server *SubmitServer) GetQueue(grpcCtx context.Context, req *api.QueueGetRequest) (*api.Queue, error)

func (*SubmitServer) GetQueueInfo

func (server *SubmitServer) GetQueueInfo(grpcCtx context.Context, req *api.QueueInfoRequest) (*api.QueueInfo, error)

func (*SubmitServer) GetQueues added in v0.3.100

func (server *SubmitServer) GetQueues(req *api.StreamingQueueGetRequest, stream api.Submit_GetQueuesServer) error

func (*SubmitServer) Health

func (server *SubmitServer) Health(ctx context.Context, _ *types.Empty) (*api.HealthCheckResponse, error)

func (*SubmitServer) ReprioritizeJobs

func (server *SubmitServer) ReprioritizeJobs(grpcCtx context.Context, request *api.JobReprioritizeRequest) (*api.JobReprioritizeResponse, error)

ReprioritizeJobs updates the priority of one of more jobs. Returns a map from job ID to any error (or nil if the call succeeded).

func (*SubmitServer) SubmitJobs

func (server *SubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error)

func (*SubmitServer) UpdateQueue

func (server *SubmitServer) UpdateQueue(grpcCtx context.Context, request *api.Queue) (*types.Empty, error)

func (*SubmitServer) UpdateQueues

func (server *SubmitServer) UpdateQueues(grpcCtx context.Context, request *api.QueueList) (*api.BatchQueueUpdateResponse, error)

type UsageServer

type UsageServer struct {
	// contains filtered or unexported fields
}

func NewUsageServer

func NewUsageServer(
	authorizer ActionAuthorizer,
	priorityHalfTime time.Duration,
	schedulingConfig *configuration.SchedulingConfig,
	usageRepository repository.UsageRepository,
	queueRepository repository.QueueRepository,
) *UsageServer

func (*UsageServer) ReportUsage

func (s *UsageServer) ReportUsage(grpcCtx context.Context, report *api.ClusterUsageReport) (*types.Empty, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL