requester

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseEndpoint

type BaseEndpoint struct {
	// contains filtered or unexported fields
}

BaseEndpoint base implementation of requester Endpoint

func NewBaseEndpoint

func NewBaseEndpoint(params *BaseEndpointParams) *BaseEndpoint

func (*BaseEndpoint) ApproveJob added in v0.3.24

func (node *BaseEndpoint) ApproveJob(ctx context.Context, approval bidstrategy.ModerateJobRequest) error

func (*BaseEndpoint) CancelJob

func (node *BaseEndpoint) CancelJob(ctx context.Context, request CancelJobRequest) (CancelJobResult, error)

func (*BaseEndpoint) ReadLogs added in v0.3.26

func (node *BaseEndpoint) ReadLogs(ctx context.Context, request ReadLogsRequest) (ReadLogsResponse, error)

func (*BaseEndpoint) SubmitJob

func (node *BaseEndpoint) SubmitJob(ctx context.Context, data model.JobCreatePayload) (*model.Job, error)

func (*BaseEndpoint) VerifyExecutions added in v1.0.1

func (node *BaseEndpoint) VerifyExecutions(ctx context.Context, results external.ExternalVerificationResponse) error

type BaseEndpointParams

type BaseEndpointParams struct {
	ID                         string
	PublicKey                  []byte
	Queue                      Queue
	Selector                   bidstrategy.SemanticBidStrategy
	Store                      jobstore.Store
	ComputeEndpoint            compute.Endpoint
	Verifiers                  verifier.VerifierProvider
	StorageProviders           storage.StorageProvider
	MinJobExecutionTimeout     time.Duration
	DefaultJobExecutionTimeout time.Duration
	GetBiddingCallback         func() *url.URL
}

type BaseScheduler added in v0.3.25

type BaseScheduler struct {
	// contains filtered or unexported fields
}

func NewBaseScheduler added in v0.3.25

func NewBaseScheduler(params BaseSchedulerParams) *BaseScheduler

func (*BaseScheduler) CancelJob added in v0.3.25

func (s *BaseScheduler) CancelJob(ctx context.Context, request CancelJobRequest) (CancelJobResult, error)

func (*BaseScheduler) OnBidComplete added in v0.3.26

func (s *BaseScheduler) OnBidComplete(ctx context.Context, response compute.BidResult)

OnBidComplete implements compute.Callback

func (*BaseScheduler) OnCancelComplete added in v0.3.25

func (s *BaseScheduler) OnCancelComplete(ctx context.Context, result compute.CancelResult)

func (*BaseScheduler) OnComputeFailure added in v0.3.25

func (s *BaseScheduler) OnComputeFailure(ctx context.Context, result compute.ComputeError)

func (*BaseScheduler) OnPublishComplete added in v0.3.25

func (s *BaseScheduler) OnPublishComplete(ctx context.Context, result compute.PublishResult)

func (*BaseScheduler) OnRunComplete added in v0.3.25

func (s *BaseScheduler) OnRunComplete(ctx context.Context, result compute.RunResult)

func (*BaseScheduler) StartJob added in v0.3.25

func (s *BaseScheduler) StartJob(ctx context.Context, req StartJobRequest) (err error)

func (*BaseScheduler) TransitionJobState added in v1.0.1

func (s *BaseScheduler) TransitionJobState(ctx context.Context, jobID string)

TransitionJobState checks the current state of the job and transitions it to the next state if possible, along with triggering actions needed to transition, such as updating the job state and notifying compute nodes. This method is agnostic to how it was called to allow using the same logic as a response to callback from a compute node, or as a result of a periodic check that checks for stale jobs.

func (*BaseScheduler) VerifyExecutions added in v1.0.1

func (s *BaseScheduler) VerifyExecutions(
	ctx context.Context,
	verificationResults []verifier.VerifierResult,
) (succeeded, failed []verifier.VerifierResult)

type BaseSchedulerParams added in v0.3.25

type BaseSchedulerParams struct {
	ID                   string
	Host                 host.Host
	JobStore             jobstore.Store
	NodeSelector         NodeSelector
	OverAskForBidsFactor int
	RetryStrategy        RetryStrategy
	ComputeEndpoint      compute.Endpoint
	Verifiers            verifier.VerifierProvider
	StorageProviders     storage.StorageProvider
	EventEmitter         EventEmitter
	GetVerifyCallback    func() *url.URL
}

type CancelJobRequest

type CancelJobRequest struct {
	JobID         string
	Reason        string
	UserTriggered bool
}

type CancelJobResult

type CancelJobResult struct{}

type Endpoint

type Endpoint interface {
	// SubmitJob submits a new job to the network.
	SubmitJob(context.Context, model.JobCreatePayload) (*model.Job, error)
	// ApproveJob approves or rejects the running of a job.
	ApproveJob(context.Context, bidstrategy.ModerateJobRequest) error
	// CancelJob cancels an existing job.
	CancelJob(context.Context, CancelJobRequest) (CancelJobResult, error)
	// VerifyExecutions approves or rejects the publishing of an execution.
	VerifyExecutions(context.Context, external.ExternalVerificationResponse) error
	// ReadLogs retrieves the logs for an execution
	ReadLogs(context.Context, ReadLogsRequest) (ReadLogsResponse, error)
}

Endpoint is the frontend and entry point to the requester node for the end users to submit, update and cancel jobs.

type ErrJobAlreadyTerminal

type ErrJobAlreadyTerminal struct {
	JobID string
}

func NewErrJobAlreadyTerminal

func NewErrJobAlreadyTerminal(jobID string) ErrJobAlreadyTerminal

func (ErrJobAlreadyTerminal) Error

func (e ErrJobAlreadyTerminal) Error() string

type ErrNodeNotFound

type ErrNodeNotFound struct {
	// contains filtered or unexported fields
}

ErrNodeNotFound is returned when nodeInfo was not found for a requested peer id

func NewErrNodeNotFound

func NewErrNodeNotFound(peerID peer.ID) ErrNodeNotFound

func (ErrNodeNotFound) Error

func (e ErrNodeNotFound) Error() string

type ErrNotEnoughNodes

type ErrNotEnoughNodes struct {
	RequestedNodes int
	AvailableNodes int
}

ErrNotEnoughNodes is returned when not enough nodes in the network to run a job

func NewErrNotEnoughNodes

func NewErrNotEnoughNodes(requestedNodes, availableNodes int) ErrNotEnoughNodes

func (ErrNotEnoughNodes) Error

func (e ErrNotEnoughNodes) Error() string

type EventEmitter

type EventEmitter struct {
	// contains filtered or unexported fields
}

func NewEventEmitter

func NewEventEmitter(params EventEmitterParams) EventEmitter

func (EventEmitter) EmitBidAccepted

func (e EventEmitter) EmitBidAccepted(
	ctx context.Context, request compute.BidAcceptedRequest, response compute.BidAcceptedResponse)

func (EventEmitter) EmitBidReceived

func (e EventEmitter) EmitBidReceived(
	ctx context.Context, result compute.BidResult)

func (EventEmitter) EmitBidRejected

func (e EventEmitter) EmitBidRejected(
	ctx context.Context, request compute.BidRejectedRequest, response compute.BidRejectedResponse)

func (EventEmitter) EmitComputeFailure

func (e EventEmitter) EmitComputeFailure(ctx context.Context, executionID model.ExecutionID, err error)

func (EventEmitter) EmitEvent

func (e EventEmitter) EmitEvent(ctx context.Context, event model.JobEvent) error

func (EventEmitter) EmitEventSilently

func (e EventEmitter) EmitEventSilently(ctx context.Context, event model.JobEvent)

func (EventEmitter) EmitJobCanceled added in v0.3.26

func (e EventEmitter) EmitJobCanceled(ctx context.Context, req CancelJobRequest)

func (EventEmitter) EmitJobCreated

func (e EventEmitter) EmitJobCreated(
	ctx context.Context, job model.Job)

func (EventEmitter) EmitPublishComplete

func (e EventEmitter) EmitPublishComplete(ctx context.Context, response compute.PublishResult)

func (EventEmitter) EmitResultAccepted

func (e EventEmitter) EmitResultAccepted(
	ctx context.Context, request compute.ResultAcceptedRequest, response compute.ResultAcceptedResponse)

func (EventEmitter) EmitResultRejected

func (e EventEmitter) EmitResultRejected(
	ctx context.Context, request compute.ResultRejectedRequest, response compute.ResultRejectedResponse)

func (EventEmitter) EmitRunComplete

func (e EventEmitter) EmitRunComplete(ctx context.Context, response compute.RunResult)

type EventEmitterParams

type EventEmitterParams struct {
	EventConsumer eventhandler.JobEventHandler
}

A quick workaround to publish job events locally as we still have some types that rely on job events to update their states (e.g. localdb) and to take actions (e.g. websockets and logging) TODO: create a strongly typed local event emitter similar to libp2p event bus, and update localdb directly from

requester instead of consuming events.

type Housekeeping

type Housekeeping struct {
	// contains filtered or unexported fields
}

func NewHousekeeping

func NewHousekeeping(params HousekeepingParams) *Housekeeping

func (*Housekeeping) Stop

func (h *Housekeeping) Stop()

type HousekeepingParams

type HousekeepingParams struct {
	Endpoint Endpoint
	JobStore jobstore.Store
	NodeID   string
	Interval time.Duration
}

type NodeDiscoverer

type NodeDiscoverer interface {
	ListNodes(ctx context.Context) ([]model.NodeInfo, error)
	FindNodes(ctx context.Context, job model.Job) ([]model.NodeInfo, error)
}

NodeDiscoverer discovers nodes in the network that are suitable to execute a job.

type NodeRank

type NodeRank struct {
	NodeInfo model.NodeInfo
	Rank     int
}

NodeRank represents a node and its rank. The higher the rank, the more preferable a node is to execute the job. A negative rank means the node is not suitable to execute the job.

type NodeRanker

type NodeRanker interface {
	RankNodes(ctx context.Context, job model.Job, nodes []model.NodeInfo) ([]NodeRank, error)
}

NodeRanker ranks nodes based on their suitability to execute a job.

type NodeSelector added in v0.3.25

type NodeSelector struct {
	// contains filtered or unexported fields
}

func NewNodeSelector added in v0.3.25

func NewNodeSelector(params NodeSelectorParams) *NodeSelector

func (*NodeSelector) SelectNodes added in v0.3.25

func (s *NodeSelector) SelectNodes(ctx context.Context, job model.Job, minCount, desiredCount int) ([]NodeRank, error)

type NodeSelectorParams added in v0.3.25

type NodeSelectorParams struct {
	NodeDiscoverer NodeDiscoverer
	NodeRanker     NodeRanker
}

type Queue added in v0.3.24

type Queue interface {
	Scheduler

	EnqueueJob(context.Context, model.Job) error
}

func NewQueue added in v0.3.24

func NewQueue(store jobstore.Store, scheduler Scheduler, emitter EventEmitter) Queue

type ReadLogsRequest added in v0.3.26

type ReadLogsRequest struct {
	JobID       string
	ExecutionID string
	WithHistory bool
	Follow      bool
}

type ReadLogsResponse added in v0.3.26

type ReadLogsResponse struct {
	Address           string
	ExecutionComplete bool
}

type RetryRequest added in v0.3.25

type RetryRequest struct {
	JobID string
}

type RetryStrategy added in v0.3.25

type RetryStrategy interface {
	// ShouldRetry returns true if the job can be retried.
	ShouldRetry(ctx context.Context, request RetryRequest) bool
}

type Scheduler

type Scheduler interface {
	StartJob(context.Context, StartJobRequest) error
	CancelJob(context.Context, CancelJobRequest) (CancelJobResult, error)
	VerifyExecutions(context.Context, []verifier.VerifierResult) (succeeded, failed []verifier.VerifierResult)
}

Scheduler distributes jobs to the compute nodes and tracks the executions.

type StartJobRequest

type StartJobRequest struct {
	Job model.Job
}

StartJobRequest triggers the scheduling of a job.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL