broker

package
v0.0.0-...-beee317 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PreCheckConfig

func PreCheckConfig(config *resModel.Config) error

PreCheckConfig checks the configuration of external storage.

Types

type Broker

type Broker interface {
	pb.BrokerServiceServer

	// OpenStorage creates a storage Handle for a worker.
	OpenStorage(
		ctx context.Context,
		projectInfo tenant.ProjectInfo,
		workerID resModel.WorkerID,
		jobID resModel.JobID,
		resourcePath resModel.ResourceID,
		opts ...OpenStorageOption,
	) (Handle, error)

	// OnWorkerClosed is called when a worker is closing.
	// The implementation should do necessary garbage collection
	// for the worker, especially local temporary files.
	OnWorkerClosed(
		ctx context.Context,
		workerID resModel.WorkerID,
		jobID resModel.JobID,
	)

	GetEnabledBucketStorage() (bool, resModel.ResourceType)

	Close()
}

A Broker is created and maintained by the executor and provides file resources to the tasks.

type DefaultBroker

type DefaultBroker struct {
	// contains filtered or unexported fields
}

DefaultBroker implements the Broker interface

func NewBroker

func NewBroker(
	ctx context.Context,
	executorID resModel.ExecutorID,
	client client.ServerMasterClient,
) (*DefaultBroker, error)

NewBroker creates a new Impl instance.

func NewBrokerWithConfig

func NewBrokerWithConfig(
	config *resModel.Config,
	executorID resModel.ExecutorID,
	client client.ResourceManagerClient,
) (*DefaultBroker, error)

NewBrokerWithConfig creates a new Impl instance based on the given config.

func (*DefaultBroker) Close

func (b *DefaultBroker) Close()

Close cleans up the broker.

func (*DefaultBroker) GetEnabledBucketStorage

func (b *DefaultBroker) GetEnabledBucketStorage() (bool, resModel.ResourceType)

GetEnabledBucketStorage returns true and the corresponding resource type if bucket storage is enabled.

func (*DefaultBroker) OnWorkerClosed

func (b *DefaultBroker) OnWorkerClosed(ctx context.Context, workerID resModel.WorkerID, jobID resModel.JobID)

OnWorkerClosed implements Broker.OnWorkerClosed

func (*DefaultBroker) OpenStorage

func (b *DefaultBroker) OpenStorage(
	ctx context.Context,
	projectInfo tenant.ProjectInfo,
	workerID resModel.WorkerID,
	jobID resModel.JobID,
	resID resModel.ResourceID,
	opts ...OpenStorageOption,
) (Handle, error)

OpenStorage implements Broker.OpenStorage

func (*DefaultBroker) RemoveResource

RemoveResource implements pb.BrokerServiceServer.

type Handle

type Handle interface {
	ID() resModel.ResourceID
	BrExternalStorage() brStorage.ExternalStorage
	Persist(ctx context.Context) error
	Discard(ctx context.Context) error
}

Handle defines an interface for interact with framework

type MockBroker

type MockBroker struct {
	*DefaultBroker
	// contains filtered or unexported fields
}

MockBroker is a broker used to testing other components that depend on a Broker

func NewBrokerForTesting

func NewBrokerForTesting(executorID resModel.ExecutorID) *MockBroker

NewBrokerForTesting creates a MockBroker instance for testing only

func (*MockBroker) AssertFileExists

func (b *MockBroker) AssertFileExists(
	t *testing.T,
	workerID resModel.WorkerID,
	resourceID resModel.ResourceID,
	fileName string,
)

AssertFileExists checks lock file exists

func (*MockBroker) AssertPersisted

func (b *MockBroker) AssertPersisted(t *testing.T, id resModel.ResourceID)

AssertPersisted checks resource is in persisted list

func (*MockBroker) OpenStorage

func (b *MockBroker) OpenStorage(
	ctx context.Context,
	projectInfo tenant.ProjectInfo,
	workerID resModel.WorkerID,
	jobID resModel.JobID,
	resourcePath resModel.ResourceID,
	opts ...OpenStorageOption,
) (Handle, error)

OpenStorage wraps broker.OpenStorage

type OpenStorageOption

type OpenStorageOption func(*openStorageOptions)

OpenStorageOption is an option for OpenStorage.

func WithCleanBeforeOpen

func WithCleanBeforeOpen() OpenStorageOption

WithCleanBeforeOpen indicates that the storage should be cleaned before open.

type ResourceHandle

type ResourceHandle struct {
	// contains filtered or unexported fields
}

ResourceHandle contains a brStorage.ExternalStorage. It helps Dataflow Engine reuse the external storage facilities implemented in Br.

func (*ResourceHandle) BrExternalStorage

func (h *ResourceHandle) BrExternalStorage() brStorage.ExternalStorage

BrExternalStorage implements Handle.BrExternalStorage

func (*ResourceHandle) Discard

func (h *ResourceHandle) Discard(ctx context.Context) error

Discard implements Handle.Discard Note that the current design does not allow multiple workers to hold persistent resources simultaneously.

func (*ResourceHandle) ID

ID implements Handle.ID

func (*ResourceHandle) Persist

func (h *ResourceHandle) Persist(ctx context.Context) error

Persist implements Handle.Persist

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL