Documentation ¶
Index ¶
- type AggregatedQueueServer
- func (q *AggregatedQueueServer) RenewLease(grpcCtx context.Context, request *api.RenewLeaseRequest) (*api.IdList, error)
- func (q *AggregatedQueueServer) ReportDone(grpcCtx context.Context, idList *api.IdList) (*api.IdList, error)
- func (q *AggregatedQueueServer) ReturnLease(grpcCtx context.Context, request *api.ReturnLeaseRequest) (*types.Empty, error)
- func (q *AggregatedQueueServer) StreamingLeaseJobs(stream api.AggregatedQueue_StreamingLeaseJobsServer) error
- type CancelJobPayload
- type CancelledJobPayload
- type ErrUnauthorized
- type EventServer
- func (s *EventServer) GetJobSetEvents(request *api.JobSetRequest, stream api.Event_GetJobSetEventsServer) error
- func (s *EventServer) Health(_ context.Context, _ *types.Empty) (*api.HealthCheckResponse, error)
- func (s *EventServer) Report(grpcCtx context.Context, message *api.EventMessage) (*types.Empty, error)
- func (s *EventServer) ReportMultiple(grpcCtx context.Context, message *api.EventList) (*types.Empty, error)
- func (s *EventServer) Watch(req *api.WatchRequest, stream api.Event_WatchServer) error
- type EventsPrinter
- type PulsarSubmitServer
- func (srv *PulsarSubmitServer) Authorize(ctx *armadacontext.Context, queueName string, anyPerm permission.Permission, ...) (userId string, groups []string, err error)
- func (srv *PulsarSubmitServer) CancelJobSet(grpcCtx context.Context, req *api.JobSetCancelRequest) (*types.Empty, error)
- func (srv *PulsarSubmitServer) CancelJobs(grpcCtx context.Context, req *api.JobCancelRequest) (*api.CancellationResult, error)
- func (srv *PulsarSubmitServer) CreateQueue(ctx context.Context, req *api.Queue) (*types.Empty, error)
- func (srv *PulsarSubmitServer) CreateQueues(ctx context.Context, req *api.QueueList) (*api.BatchQueueCreateResponse, error)
- func (srv *PulsarSubmitServer) DeleteQueue(ctx context.Context, req *api.QueueDeleteRequest) (*types.Empty, error)
- func (srv *PulsarSubmitServer) GetQueue(ctx context.Context, req *api.QueueGetRequest) (*api.Queue, error)
- func (srv *PulsarSubmitServer) GetQueueInfo(ctx context.Context, req *api.QueueInfoRequest) (*api.QueueInfo, error)
- func (srv *PulsarSubmitServer) ReprioritizeJobs(grpcCtx context.Context, req *api.JobReprioritizeRequest) (*api.JobReprioritizeResponse, error)
- func (srv *PulsarSubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error)
- func (srv *PulsarSubmitServer) UpdateQueue(ctx context.Context, req *api.Queue) (*types.Empty, error)
- func (srv *PulsarSubmitServer) UpdateQueues(ctx context.Context, req *api.QueueList) (*api.BatchQueueUpdateResponse, error)
- type SchedulerJobRepositoryAdapter
- type SubmitFromLog
- func (srv *SubmitFromLog) BatchedCancelJobsById(ctx *armadacontext.Context, userId string, ...) (bool, error)
- func (srv *SubmitFromLog) CancelJobSet(ctx *armadacontext.Context, userId string, queueName string, jobSetName string, ...) (bool, error)
- func (srv *SubmitFromLog) CancelJobSets(ctx *armadacontext.Context, userId string, queueName string, jobSetName string, ...) (bool, error)
- func (srv *SubmitFromLog) CancelJobs(ctx *armadacontext.Context, userId string, es []*armadaevents.CancelJob) (bool, error)
- func (srv *SubmitFromLog) CancelJobsById(ctx *armadacontext.Context, userId string, ...) ([]string, error)
- func (srv *SubmitFromLog) DeleteFailedJobs(ctx *armadacontext.Context, es []*armadaevents.EventSequence_Event) (bool, error)
- func (srv *SubmitFromLog) ProcessSequence(ctx *armadacontext.Context, sequence *armadaevents.EventSequence) bool
- func (srv *SubmitFromLog) ProcessSubSequence(ctx *armadacontext.Context, i int, sequence *armadaevents.EventSequence) (j int, err error)
- func (srv *SubmitFromLog) ReprioritizeJobSet(ctx *armadacontext.Context, userId string, queueName string, jobSetName string, ...) (bool, error)
- func (srv *SubmitFromLog) ReprioritizeJobSets(ctx *armadacontext.Context, userId string, queueName string, jobSetName string, ...) (bool, error)
- func (srv *SubmitFromLog) ReprioritizeJobs(ctx *armadacontext.Context, userId string, es []*armadaevents.ReprioritiseJob) (bool, error)
- func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error
- func (srv *SubmitFromLog) SubmitJobs(ctx *armadacontext.Context, userId string, groups []string, queueName string, ...) (bool, error)
- func (srv *SubmitFromLog) UpdateJobStartTimes(ctx *armadacontext.Context, es []*armadaevents.EventSequence_Event) (bool, error)
- type SubmitServer
- func (server *SubmitServer) CancelJobSet(grpcCtx context.Context, request *api.JobSetCancelRequest) (*types.Empty, error)
- func (server *SubmitServer) CancelJobs(grpcCtx context.Context, request *api.JobCancelRequest) (*api.CancellationResult, error)
- func (server *SubmitServer) CreateQueue(grpcCtx context.Context, request *api.Queue) (*types.Empty, error)
- func (server *SubmitServer) CreateQueues(grpcCtx context.Context, request *api.QueueList) (*api.BatchQueueCreateResponse, error)
- func (server *SubmitServer) DeleteQueue(grpcCtx context.Context, request *api.QueueDeleteRequest) (*types.Empty, error)
- func (server *SubmitServer) GetQueue(grpcCtx context.Context, req *api.QueueGetRequest) (*api.Queue, error)
- func (server *SubmitServer) GetQueueInfo(grpcCtx context.Context, req *api.QueueInfoRequest) (*api.QueueInfo, error)
- func (server *SubmitServer) GetQueues(req *api.StreamingQueueGetRequest, stream api.Submit_GetQueuesServer) error
- func (server *SubmitServer) Health(ctx context.Context, _ *types.Empty) (*api.HealthCheckResponse, error)
- func (server *SubmitServer) ReprioritizeJobs(grpcCtx context.Context, request *api.JobReprioritizeRequest) (*api.JobReprioritizeResponse, error)
- func (server *SubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error)
- func (server *SubmitServer) UpdateQueue(grpcCtx context.Context, request *api.Queue) (*types.Empty, error)
- func (server *SubmitServer) UpdateQueues(grpcCtx context.Context, request *api.QueueList) (*api.BatchQueueUpdateResponse, error)
- type UsageServer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AggregatedQueueServer ¶
type AggregatedQueueServer struct { // For storing reports of scheduling attempts. SchedulingContextRepository *scheduler.SchedulingContextRepository // Stores the most recent NodeDb for each executor. // Used to check if a job could ever be scheduled at job submit time. SubmitChecker *scheduler.SubmitChecker // contains filtered or unexported fields }
func NewAggregatedQueueServer ¶
func NewAggregatedQueueServer( permissions authorization.PermissionChecker, schedulingConfig configuration.SchedulingConfig, jobRepository repository.JobRepository, queueRepository repository.QueueRepository, usageRepository repository.UsageRepository, eventStore repository.EventStore, schedulingInfoRepository repository.SchedulingInfoRepository, pulsarProducer pulsar.Producer, maxPulsarMessageSize uint, executorRepository database.ExecutorRepository, ) *AggregatedQueueServer
func (*AggregatedQueueServer) RenewLease ¶
func (q *AggregatedQueueServer) RenewLease(grpcCtx context.Context, request *api.RenewLeaseRequest) (*api.IdList, error)
func (*AggregatedQueueServer) ReportDone ¶
func (*AggregatedQueueServer) ReturnLease ¶
func (q *AggregatedQueueServer) ReturnLease(grpcCtx context.Context, request *api.ReturnLeaseRequest) (*types.Empty, error)
func (*AggregatedQueueServer) StreamingLeaseJobs ¶
func (q *AggregatedQueueServer) StreamingLeaseJobs(stream api.AggregatedQueue_StreamingLeaseJobsServer) error
StreamingLeaseJobs is called by the executor to request jobs for it to run. It streams jobs to the executor as quickly as it can and then waits to receive ids back. Only jobs for which an id was sent back are marked as leased.
This function should be used instead of the LeaseJobs function in most cases.
type CancelJobPayload ¶ added in v0.3.68
type CancelledJobPayload ¶ added in v0.3.68
type CancelledJobPayload struct {
// contains filtered or unexported fields
}
type ErrUnauthorized ¶
type ErrUnauthorized struct { authorization.Principal // For example ["does not own the queue and have SubmitJobs permissions", "does not have SubmitAnyJobs permissions"] Reasons []string }Principal
ErrUnauthorized represents an error that occurs when a client tries to perform some action through the gRPC API for which it does not have permissions. Produces error messages of the form "Tom" does not own the queue and have SubmitJobs permissions, "Tom" does not have SubmitAnyJobs permissions
The caller of a function that may produce this error should capture is using errors.As and prepend whatever action the principal was attempting.
func MergePermissionErrors ¶
func MergePermissionErrors(errs ...*ErrUnauthorized) *ErrUnauthorized
func (*ErrUnauthorized) Error ¶
func (err *ErrUnauthorized) Error() string
type EventServer ¶
type EventServer struct {
// contains filtered or unexported fields
}
func NewEventServer ¶
func NewEventServer( permissions authorization.PermissionChecker, eventRepository repository.EventRepository, eventStore repository.EventStore, queueRepository repository.QueueRepository, jobRepository repository.JobRepository, ) *EventServer
func (*EventServer) GetJobSetEvents ¶
func (s *EventServer) GetJobSetEvents(request *api.JobSetRequest, stream api.Event_GetJobSetEventsServer) error
GetJobSetEvents streams back all events associated with a particular job set.
func (*EventServer) Health ¶
func (s *EventServer) Health(_ context.Context, _ *types.Empty) (*api.HealthCheckResponse, error)
func (*EventServer) Report ¶
func (s *EventServer) Report(grpcCtx context.Context, message *api.EventMessage) (*types.Empty, error)
func (*EventServer) ReportMultiple ¶
func (*EventServer) Watch ¶
func (s *EventServer) Watch(req *api.WatchRequest, stream api.Event_WatchServer) error
type EventsPrinter ¶
type EventsPrinter struct { Client pulsar.Client Topic string SubscriptionName string // Logger from which the loggers used by this service are derived // (e.g., using srv.Logger.WithField), or nil, in which case the global logrus logger is used. Logger *logrus.Entry }
EventsPrinter is a service that prints all events passing through pulsar to a logger. This service is only meant for use during development; it will be slow when the number of events is large.
func (*EventsPrinter) Run ¶
func (srv *EventsPrinter) Run(ctx *armadacontext.Context) error
Run the service that reads from Pulsar and updates Armada until the provided context is cancelled.
type PulsarSubmitServer ¶
type PulsarSubmitServer struct { api.UnimplementedSubmitServer Producer pulsar.Producer Permissions authorization.PermissionChecker QueueRepository repository.QueueRepository // Maximum size of Pulsar messages MaxAllowedMessageSize uint // Fall back to the legacy submit server for queue administration endpoints. SubmitServer *SubmitServer // Used for job submission deduplication. KVStore *pgkeyvalue.PGKeyValueStore // Used to check at job submit time if the job could ever be scheduled on either legacy or pulsar schedulers PulsarSchedulerSubmitChecker *scheduler.SubmitChecker LegacySchedulerSubmitChecker *scheduler.SubmitChecker // Flag to control if we enable sending messages to the pulsar scheduler PulsarSchedulerEnabled bool // Probability of using the pulsar scheduler. Has no effect if PulsarSchedulerEnabled is false ProbabilityOfUsingPulsarScheduler float64 // Used to assign a job to either legacy or pulsar schedulers. Injected here to allow repeatable tests Rand *rand.Rand // Gang id annotation. Needed because we cannot split a gang across schedulers. GangIdAnnotation string // Temporary flag to stop us rejecting jobs as we switch over to new submit checks IgnoreJobSubmitChecks bool }
PulsarSubmitServer is a service that accepts API calls according to the original Armada submit API and publishes messages to Pulsar based on those calls. TODO: Consider returning a list of message ids of the messages generated TODO: Include job set as the message key for each message
func (*PulsarSubmitServer) Authorize ¶
func (srv *PulsarSubmitServer) Authorize( ctx *armadacontext.Context, queueName string, anyPerm permission.Permission, perm queue.PermissionVerb, ) (userId string, groups []string, err error)
Authorize authorises a user request to submit a state transition message to the log. User information used for authorization is extracted from the provided context. Checks that the user has either anyPerm (e.g., permissions.SubmitAnyJobs) or perm (e.g., PermissionVerbSubmit) for this queue. Returns the userId and groups extracted from the context.
func (*PulsarSubmitServer) CancelJobSet ¶
func (srv *PulsarSubmitServer) CancelJobSet(grpcCtx context.Context, req *api.JobSetCancelRequest) (*types.Empty, error)
func (*PulsarSubmitServer) CancelJobs ¶
func (srv *PulsarSubmitServer) CancelJobs(grpcCtx context.Context, req *api.JobCancelRequest) (*api.CancellationResult, error)
func (*PulsarSubmitServer) CreateQueue ¶
func (srv *PulsarSubmitServer) CreateQueue(ctx context.Context, req *api.Queue) (*types.Empty, error)
Fallback methods. Calls into an embedded server.SubmitServer.
func (*PulsarSubmitServer) CreateQueues ¶
func (srv *PulsarSubmitServer) CreateQueues(ctx context.Context, req *api.QueueList) (*api.BatchQueueCreateResponse, error)
func (*PulsarSubmitServer) DeleteQueue ¶
func (srv *PulsarSubmitServer) DeleteQueue(ctx context.Context, req *api.QueueDeleteRequest) (*types.Empty, error)
func (*PulsarSubmitServer) GetQueue ¶
func (srv *PulsarSubmitServer) GetQueue(ctx context.Context, req *api.QueueGetRequest) (*api.Queue, error)
func (*PulsarSubmitServer) GetQueueInfo ¶
func (srv *PulsarSubmitServer) GetQueueInfo(ctx context.Context, req *api.QueueInfoRequest) (*api.QueueInfo, error)
func (*PulsarSubmitServer) ReprioritizeJobs ¶
func (srv *PulsarSubmitServer) ReprioritizeJobs(grpcCtx context.Context, req *api.JobReprioritizeRequest) (*api.JobReprioritizeResponse, error)
func (*PulsarSubmitServer) SubmitJobs ¶
func (srv *PulsarSubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error)
func (*PulsarSubmitServer) UpdateQueue ¶
func (*PulsarSubmitServer) UpdateQueues ¶
func (srv *PulsarSubmitServer) UpdateQueues(ctx context.Context, req *api.QueueList) (*api.BatchQueueUpdateResponse, error)
type SchedulerJobRepositoryAdapter ¶ added in v0.3.50
type SchedulerJobRepositoryAdapter struct {
// contains filtered or unexported fields
}
func (*SchedulerJobRepositoryAdapter) GetExistingJobsByIds ¶ added in v0.3.50
func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) ([]schedulerinterfaces.LegacySchedulerJob, error)
func (*SchedulerJobRepositoryAdapter) GetQueueJobIds ¶ added in v0.3.50
func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) ([]string, error)
type SubmitFromLog ¶
type SubmitFromLog struct { SubmitServer *SubmitServer Consumer pulsar.Consumer // Logger from which the loggers used by this service are derived // (e.g., using srv.Logger.WithField), or nil, in which case the global logrus logger is used. Logger *logrus.Entry }
SubmitFromLog is a service that reads messages from Pulsar and updates the state of the Armada server accordingly (in particular, it writes to Redis). Calls into an embedded Armada submit server object.
func (*SubmitFromLog) BatchedCancelJobsById ¶
func (srv *SubmitFromLog) BatchedCancelJobsById(ctx *armadacontext.Context, userId string, cancelJobPayloads []*CancelJobPayload) (bool, error)
func (*SubmitFromLog) CancelJobSet ¶
func (srv *SubmitFromLog) CancelJobSet(ctx *armadacontext.Context, userId string, queueName string, jobSetName string, reason string) (bool, error)
func (*SubmitFromLog) CancelJobSets ¶
func (srv *SubmitFromLog) CancelJobSets( ctx *armadacontext.Context, userId string, queueName string, jobSetName string, events []*armadaevents.CancelJobSet, ) (bool, error)
CancelJobSets processes several CancelJobSet events. Because event sequences are specific to queue and job set, all CancelJobSet events in a sequence are equivalent, and we only need to call CancelJobSet once.
func (*SubmitFromLog) CancelJobs ¶
func (srv *SubmitFromLog) CancelJobs(ctx *armadacontext.Context, userId string, es []*armadaevents.CancelJob) (bool, error)
CancelJobs cancels all jobs specified by the provided events in a single operation.
func (*SubmitFromLog) CancelJobsById ¶
func (srv *SubmitFromLog) CancelJobsById(ctx *armadacontext.Context, userId string, cancelJobPayloads []*CancelJobPayload) ([]string, error)
CancelJobsById cancels all jobs with the specified ids.
func (*SubmitFromLog) DeleteFailedJobs ¶ added in v0.3.53
func (srv *SubmitFromLog) DeleteFailedJobs(ctx *armadacontext.Context, es []*armadaevents.EventSequence_Event) (bool, error)
func (*SubmitFromLog) ProcessSequence ¶
func (srv *SubmitFromLog) ProcessSequence(ctx *armadacontext.Context, sequence *armadaevents.EventSequence) bool
ProcessSequence processes all events in a particular sequence. For efficiency, we may process several events at a time. To maintain ordering, we only do so for subsequences of consecutive events of equal type. The returned bool indicates if the corresponding Pulsar message should be ack'd or not.
func (*SubmitFromLog) ProcessSubSequence ¶
func (srv *SubmitFromLog) ProcessSubSequence(ctx *armadacontext.Context, i int, sequence *armadaevents.EventSequence) (j int, err error)
ProcessSubSequence processes sequence.Events[i:j-1], where j is the index of the first event in the sequence of a type different from that of sequence.Events[i], or len(sequence.Events) if no such event exists in the sequence, and returns j.
Processing one such subsequence at a time preserves ordering between events of different types. For example, SubmitJob events are processed before CancelJob events that occur later in the sequence.
Events are processed by calling into the embedded srv.SubmitServer.
Not all events are handled by this processor since the legacy scheduler writes some transitions directly to the db.
func (*SubmitFromLog) ReprioritizeJobSet ¶
func (srv *SubmitFromLog) ReprioritizeJobSet( ctx *armadacontext.Context, userId string, queueName string, jobSetName string, e *armadaevents.ReprioritiseJobSet, ) (bool, error)
func (*SubmitFromLog) ReprioritizeJobSets ¶
func (srv *SubmitFromLog) ReprioritizeJobSets( ctx *armadacontext.Context, userId string, queueName string, jobSetName string, es []*armadaevents.ReprioritiseJobSet, ) (bool, error)
ReprioritizeJobSets updates the priority of several job sets. Returns a multierror containing all errors that occurred. Since repeating this operation is safe (setting the priority is idempotent), the bool indicating if events were processed is set to false if any job set failed.
func (*SubmitFromLog) ReprioritizeJobs ¶
func (srv *SubmitFromLog) ReprioritizeJobs(ctx *armadacontext.Context, userId string, es []*armadaevents.ReprioritiseJob) (bool, error)
ReprioritizeJobs updates the priority of one of more jobs.
func (*SubmitFromLog) Run ¶
func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error
Run the service that reads from Pulsar and updates Armada until the provided context is cancelled.
func (*SubmitFromLog) SubmitJobs ¶
func (srv *SubmitFromLog) SubmitJobs( ctx *armadacontext.Context, userId string, groups []string, queueName string, jobSetName string, es []*armadaevents.SubmitJob, ) (bool, error)
SubmitJobs processes several job submit events in bulk. It returns a boolean indicating if the events were processed and any error that occurred during processing. Specifically, events are not processed if writing to the database results in a network-related error. For any other error, the jobs are marked as failed and the events are considered to have been processed.
func (*SubmitFromLog) UpdateJobStartTimes ¶
func (srv *SubmitFromLog) UpdateJobStartTimes(ctx *armadacontext.Context, es []*armadaevents.EventSequence_Event) (bool, error)
UpdateJobStartTimes records the start time (in Redis) of one of more jobs.
type SubmitServer ¶
type SubmitServer struct {
// contains filtered or unexported fields
}
func NewSubmitServer ¶
func NewSubmitServer( permissions authorization.PermissionChecker, jobRepository repository.JobRepository, queueRepository repository.QueueRepository, eventStore repository.EventStore, schedulingInfoRepository repository.SchedulingInfoRepository, cancelJobsBatchSize int, queueManagementConfig *configuration.QueueManagementConfig, schedulingConfig *configuration.SchedulingConfig, ) *SubmitServer
func (*SubmitServer) CancelJobSet ¶
func (server *SubmitServer) CancelJobSet(grpcCtx context.Context, request *api.JobSetCancelRequest) (*types.Empty, error)
func (*SubmitServer) CancelJobs ¶
func (server *SubmitServer) CancelJobs(grpcCtx context.Context, request *api.JobCancelRequest) (*api.CancellationResult, error)
CancelJobs cancels jobs identified by the request. If the request contains a job ID, only the job with that ID is cancelled. If the request contains a queue name and a job set ID, all jobs matching those are cancelled.
func (*SubmitServer) CreateQueue ¶
func (*SubmitServer) CreateQueues ¶
func (server *SubmitServer) CreateQueues(grpcCtx context.Context, request *api.QueueList) (*api.BatchQueueCreateResponse, error)
func (*SubmitServer) DeleteQueue ¶
func (server *SubmitServer) DeleteQueue(grpcCtx context.Context, request *api.QueueDeleteRequest) (*types.Empty, error)
func (*SubmitServer) GetQueue ¶
func (server *SubmitServer) GetQueue(grpcCtx context.Context, req *api.QueueGetRequest) (*api.Queue, error)
func (*SubmitServer) GetQueueInfo ¶
func (server *SubmitServer) GetQueueInfo(grpcCtx context.Context, req *api.QueueInfoRequest) (*api.QueueInfo, error)
func (*SubmitServer) GetQueues ¶ added in v0.3.100
func (server *SubmitServer) GetQueues(req *api.StreamingQueueGetRequest, stream api.Submit_GetQueuesServer) error
func (*SubmitServer) Health ¶
func (server *SubmitServer) Health(ctx context.Context, _ *types.Empty) (*api.HealthCheckResponse, error)
func (*SubmitServer) ReprioritizeJobs ¶
func (server *SubmitServer) ReprioritizeJobs(grpcCtx context.Context, request *api.JobReprioritizeRequest) (*api.JobReprioritizeResponse, error)
ReprioritizeJobs updates the priority of one of more jobs. Returns a map from job ID to any error (or nil if the call succeeded).
func (*SubmitServer) SubmitJobs ¶
func (server *SubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error)
func (*SubmitServer) UpdateQueue ¶
func (*SubmitServer) UpdateQueues ¶
func (server *SubmitServer) UpdateQueues(grpcCtx context.Context, request *api.QueueList) (*api.BatchQueueUpdateResponse, error)
type UsageServer ¶
type UsageServer struct {
// contains filtered or unexported fields
}
func NewUsageServer ¶
func NewUsageServer( permissions authorization.PermissionChecker, priorityHalfTime time.Duration, schedulingConfig *configuration.SchedulingConfig, usageRepository repository.UsageRepository, queueRepository repository.QueueRepository, ) *UsageServer
func (*UsageServer) ReportUsage ¶
func (s *UsageServer) ReportUsage(grpcCtx context.Context, report *api.ClusterUsageReport) (*types.Empty, error)