Documentation
¶
Index ¶
- Constants
- func MockBaseMasterCreateWorker(t *testing.T, master *DefaultBaseMaster, workerType frameModel.WorkerType, ...)
- func MockBaseMasterCreateWorkerMetScheduleTaskError(t *testing.T, master *DefaultBaseMaster, workerType frameModel.WorkerType, ...)
- func MockBaseMasterWorkerHeartbeat(t *testing.T, master *DefaultBaseMaster, masterID frameModel.MasterID, ...) error
- func MockBaseMasterWorkerUpdateStatus(ctx context.Context, t *testing.T, master *DefaultBaseMaster, ...)
- func MockBaseWorkerCheckSendMessage(t *testing.T, worker *DefaultBaseWorker, topic p2p.Topic, message interface{})
- func MockBaseWorkerWaitUpdateStatus(t *testing.T, worker *DefaultBaseWorker)
- func MockMasterPrepareMeta(ctx context.Context, t *testing.T, master *MockMasterImpl)
- func MustConvertWorkerType2JobType(tp WorkerType) engineModel.JobType
- type BaseJobMaster
- type BaseJobMasterExt
- type BaseMaster
- type BaseWorker
- type BaseWorkerForTesting
- type CreateWorkerOpt
- type DefaultBaseJobMaster
- func (d *DefaultBaseJobMaster) Close(ctx context.Context) error
- func (d *DefaultBaseJobMaster) CreateWorker(workerType frameModel.WorkerType, config WorkerConfig, opts ...CreateWorkerOpt) (frameModel.WorkerID, error)
- func (d *DefaultBaseJobMaster) CurrentEpoch() frameModel.Epoch
- func (d *DefaultBaseJobMaster) Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error
- func (d *DefaultBaseJobMaster) GetEnabledBucketStorage() (bool, resModel.ResourceType)
- func (d *DefaultBaseJobMaster) GetWorkers() map[frameModel.WorkerID]WorkerHandle
- func (d *DefaultBaseJobMaster) ID() runtime.RunnableID
- func (d *DefaultBaseJobMaster) Init(ctx context.Context) error
- func (d *DefaultBaseJobMaster) IsBaseJobMaster()
- func (d *DefaultBaseJobMaster) IsBaseJobMasterExt()
- func (d *DefaultBaseJobMaster) IsMasterReady() bool
- func (d *DefaultBaseJobMaster) Logger() *zap.Logger
- func (d *DefaultBaseJobMaster) MetaKVClient() metaModel.KVClient
- func (d *DefaultBaseJobMaster) MetricFactory() promutil.Factory
- func (d *DefaultBaseJobMaster) NotifyExit(ctx context.Context, errIn error) (retErr error)
- func (d *DefaultBaseJobMaster) Poll(ctx context.Context) error
- func (d *DefaultBaseJobMaster) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error
- func (d *DefaultBaseJobMaster) Stop(ctx context.Context) error
- func (d *DefaultBaseJobMaster) TriggerOpenAPIInitialize(apiGroup *gin.RouterGroup)
- func (d *DefaultBaseJobMaster) UpdateJobStatus(ctx context.Context, status frameModel.WorkerStatus) error
- func (d *DefaultBaseJobMaster) UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error
- type DefaultBaseMaster
- func (m *DefaultBaseMaster) Close(ctx context.Context) error
- func (m *DefaultBaseMaster) CreateWorker(workerType frameModel.WorkerType, config WorkerConfig, opts ...CreateWorkerOpt) (frameModel.WorkerID, error)
- func (m *DefaultBaseMaster) DeleteProjectInfo(workerID frameModel.WorkerID)
- func (m *DefaultBaseMaster) Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error
- func (m *DefaultBaseMaster) GetProjectInfo(masterID frameModel.MasterID) tenant.ProjectInfo
- func (m *DefaultBaseMaster) GetWorkers() map[frameModel.WorkerID]WorkerHandle
- func (m *DefaultBaseMaster) Init(ctx context.Context) error
- func (m *DefaultBaseMaster) InitProjectInfosAfterRecover(jobs []*frameModel.MasterMeta)
- func (m *DefaultBaseMaster) IsMasterReady() bool
- func (m *DefaultBaseMaster) Logger() *zap.Logger
- func (m *DefaultBaseMaster) MasterID() frameModel.MasterID
- func (m *DefaultBaseMaster) MasterMeta() *frameModel.MasterMeta
- func (m *DefaultBaseMaster) MetaKVClient() metaModel.KVClient
- func (m *DefaultBaseMaster) MetricFactory() promutil.Factory
- func (m *DefaultBaseMaster) NotifyExit(ctx context.Context, errIn error) error
- func (m *DefaultBaseMaster) Poll(ctx context.Context) error
- func (m *DefaultBaseMaster) PrepareWorkerConfig(workerType frameModel.WorkerType, config WorkerConfig) (rawConfig []byte, workerID frameModel.WorkerID, err error)
- func (m *DefaultBaseMaster) SetProjectInfo(workerID frameModel.WorkerID, projectInfo tenant.ProjectInfo)
- func (m *DefaultBaseMaster) Stop(ctx context.Context) error
- type DefaultBaseWorker
- func (w *DefaultBaseWorker) Close(ctx context.Context) error
- func (w *DefaultBaseWorker) Exit(ctx context.Context, exitReason ExitReason, err error, extBytes []byte) (errRet error)
- func (w *DefaultBaseWorker) GetEnabledBucketStorage() (bool, resModel.ResourceType)
- func (w *DefaultBaseWorker) ID() runtime.RunnableID
- func (w *DefaultBaseWorker) Init(ctx context.Context) error
- func (w *DefaultBaseWorker) Logger() *zap.Logger
- func (w *DefaultBaseWorker) MetaKVClient() metaModel.KVClient
- func (w *DefaultBaseWorker) MetricFactory() promutil.Factory
- func (w *DefaultBaseWorker) NotifyExit(ctx context.Context, errIn error) (retErr error)
- func (w *DefaultBaseWorker) OpenStorage(ctx context.Context, resourcePath resModel.ResourceID, ...) (broker.Handle, error)
- func (w *DefaultBaseWorker) Poll(ctx context.Context) error
- func (w *DefaultBaseWorker) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error
- func (w *DefaultBaseWorker) Stop(ctx context.Context) error
- func (w *DefaultBaseWorker) UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error
- type ExitReason
- type JobMasterImpl
- type Master
- type MasterFailoverReason
- type MasterFailoverReasonCode
- type MasterImpl
- type MessageRouter
- type MockHandle
- type MockMasterImpl
- func (m *MockMasterImpl) CloseImpl(ctx context.Context)
- func (m *MockMasterImpl) GetFrameMetaClient() pkgOrm.Client
- func (m *MockMasterImpl) InitImpl(ctx context.Context) error
- func (m *MockMasterImpl) MasterClient() *client.MockServerMasterClient
- func (m *MockMasterImpl) OnMasterRecovered(ctx context.Context) error
- func (m *MockMasterImpl) OnWorkerDispatched(worker WorkerHandle, result error) error
- func (m *MockMasterImpl) OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message interface{}) error
- func (m *MockMasterImpl) OnWorkerOffline(worker WorkerHandle, reason error) error
- func (m *MockMasterImpl) OnWorkerOnline(worker WorkerHandle) error
- func (m *MockMasterImpl) OnWorkerStatusUpdated(worker WorkerHandle, newStatus *frameModel.WorkerStatus) error
- func (m *MockMasterImpl) Reset()
- func (m *MockMasterImpl) StopImpl(ctx context.Context)
- func (m *MockMasterImpl) Tick(ctx context.Context) error
- func (m *MockMasterImpl) TickCount() int64
- type MockWorkerHandler
- func (m *MockWorkerHandler) CleanTombstone(ctx context.Context) error
- func (m *MockWorkerHandler) GetTombstone() master.TombstoneHandle
- func (m *MockWorkerHandler) ID() frameModel.WorkerID
- func (m *MockWorkerHandler) IsTombStone() bool
- func (m *MockWorkerHandler) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error
- func (m *MockWorkerHandler) Status() *frameModel.WorkerStatus
- func (m *MockWorkerHandler) Unwrap() master.RunningHandle
- type Worker
- type WorkerConfig
- type WorkerHandle
- type WorkerImpl
- type WorkerType
Constants ¶
const ( MasterTimedOut = MasterFailoverReasonCode(iota + 1) MasterReportedError )
Defines all reason codes
const ( ExitReasonUnknown = ExitReason(iota) ExitReasonFinished ExitReasonCanceled ExitReasonFailed )
define some ExitReason
Variables ¶
This section is empty.
Functions ¶
func MockBaseMasterCreateWorker ¶
func MockBaseMasterCreateWorker( t *testing.T, master *DefaultBaseMaster, workerType frameModel.WorkerType, config WorkerConfig, masterID frameModel.MasterID, workerID frameModel.WorkerID, executorID model.ExecutorID, resources []resModel.ResourceID, workerEpoch frameModel.Epoch, )
MockBaseMasterCreateWorker mocks to create worker in base master
func MockBaseMasterCreateWorkerMetScheduleTaskError ¶
func MockBaseMasterCreateWorkerMetScheduleTaskError( t *testing.T, master *DefaultBaseMaster, workerType frameModel.WorkerType, config WorkerConfig, masterID frameModel.MasterID, workerID frameModel.WorkerID, executorID model.ExecutorID, )
MockBaseMasterCreateWorkerMetScheduleTaskError mocks ScheduleTask meets error
func MockBaseMasterWorkerHeartbeat ¶
func MockBaseMasterWorkerHeartbeat( t *testing.T, master *DefaultBaseMaster, masterID frameModel.MasterID, workerID frameModel.WorkerID, executorID p2p.NodeID, ) error
MockBaseMasterWorkerHeartbeat sends HeartbeatPingMessage with mock message handler
func MockBaseMasterWorkerUpdateStatus ¶
func MockBaseMasterWorkerUpdateStatus( ctx context.Context, t *testing.T, master *DefaultBaseMaster, masterID frameModel.MasterID, workerID frameModel.WorkerID, executorID p2p.NodeID, status *frameModel.WorkerStatus, )
MockBaseMasterWorkerUpdateStatus mocks to store status in metastore and sends WorkerStatusMessage.
func MockBaseWorkerCheckSendMessage ¶
func MockBaseWorkerCheckSendMessage( t *testing.T, worker *DefaultBaseWorker, topic p2p.Topic, message interface{}, )
MockBaseWorkerCheckSendMessage checks can receive one message from mock message sender
func MockBaseWorkerWaitUpdateStatus ¶
func MockBaseWorkerWaitUpdateStatus( t *testing.T, worker *DefaultBaseWorker, )
MockBaseWorkerWaitUpdateStatus checks can receive a update status message from mock message sender
func MockMasterPrepareMeta ¶
func MockMasterPrepareMeta(ctx context.Context, t *testing.T, master *MockMasterImpl)
MockMasterPrepareMeta simulates the meta persistence for MockMasterImpl
func MustConvertWorkerType2JobType ¶
func MustConvertWorkerType2JobType(tp WorkerType) engineModel.JobType
MustConvertWorkerType2JobType return the job type of worker type. Panic if it fail. TODO: let user register a unique identifier for the metric prefix
Types ¶
type BaseJobMaster ¶
type BaseJobMaster interface { Worker // MetaKVClient return business metastore kv client with job-level isolation MetaKVClient() metaModel.KVClient // MetricFactory return a promethus factory with some underlying labels(e.g. job-id, work-id) MetricFactory() promutil.Factory // Logger return a zap logger with some underlying fields(e.g. job-id) Logger() *zap.Logger // GetWorkers return the handle of all workers, from which we can get the worker status、worker id and // the method for sending message to specific worker GetWorkers() map[frameModel.WorkerID]WorkerHandle // CreateWorker requires the framework to dispatch a new worker. // If the worker needs to access certain file system resources, it must pass // resource ID via CreateWorkerOpt CreateWorker( workerType frameModel.WorkerType, config WorkerConfig, opts ...CreateWorkerOpt, ) (frameModel.WorkerID, error) // UpdateJobStatus updates jobmaster(worker of jobmanager) status and // sends a 'status updated' message to jobmanager UpdateJobStatus(ctx context.Context, status frameModel.WorkerStatus) error // CurrentEpoch return the epoch of current job CurrentEpoch() frameModel.Epoch // SendMessage sends a message of specific topic to jobmanager in a blocking or nonblocking way SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error // Exit should be called when jobmaster (in user logic) wants to exit. // exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error // IsMasterReady returns whether the master has received heartbeats for all // workers after a fail-over. If this is the first time the JobMaster started up, // the return value is always true. IsMasterReady() bool // IsBaseJobMaster is an empty function used to prevent accidental implementation // of this interface. IsBaseJobMaster() // GetEnabledBucketStorage returns whether the bucket storage is enabled and the corresponding resource type // if the bucket exists GetEnabledBucketStorage() (bool, resModel.ResourceType) }
BaseJobMaster defines an interface that can work as a job master, it embeds a Worker interface which can run on dataflow engine runtime, and also provides some utility methods.
func NewBaseJobMaster ¶
func NewBaseJobMaster( ctx *dcontext.Context, jobMasterImpl JobMasterImpl, masterID frameModel.MasterID, workerID frameModel.WorkerID, tp frameModel.WorkerType, workerEpoch frameModel.Epoch, ) BaseJobMaster
NewBaseJobMaster creates a new DefaultBaseJobMaster instance
type BaseJobMasterExt ¶
type BaseJobMasterExt interface { // TriggerOpenAPIInitialize is used to trigger the initialization of openapi handler. // It just delegates to the JobMasterImpl.OnOpenAPIInitialized. TriggerOpenAPIInitialize(apiGroup *gin.RouterGroup) // IsBaseJobMasterExt is an empty function used to prevent accidental implementation // of this interface. IsBaseJobMasterExt() }
BaseJobMasterExt extends BaseJobMaster with some extra methods. These methods are used by framework and is not visible to JobMasterImpl.
type BaseMaster ¶
type BaseMaster interface { Master // MetaKVClient return business metastore kv client with job-level isolation MetaKVClient() metaModel.KVClient // MetricFactory return a promethus factory with some underlying labels(e.g. job-id, work-id) MetricFactory() promutil.Factory // Logger return a zap logger with some underlying fields(e.g. job-id) Logger() *zap.Logger // MasterMeta return the meta data of master MasterMeta() *frameModel.MasterMeta // GetWorkers return the handle of all workers, from which we can get the worker status、worker id and // the method for sending message to specific worker GetWorkers() map[frameModel.WorkerID]WorkerHandle // IsMasterReady returns whether the master has received heartbeats for all // workers after a fail-over. If this is the first time the JobMaster started up, // the return value is always true. IsMasterReady() bool // Exit should be called when master (in user logic) wants to exit. // exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed // NOTE: Currently, no implement has used this method, but we still keep it to make the interface intact Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error // CreateWorker is the latest version of CreateWorker, but with // a more flexible way of passing options. // If the worker needs to access certain file system resources, it must pass // resource ID via CreateWorkerOpt CreateWorker( workerType frameModel.WorkerType, config WorkerConfig, opts ...CreateWorkerOpt, ) (frameModel.WorkerID, error) }
BaseMaster defines the master interface, it embeds the Master interface and contains more core logic of a master
func NewBaseMaster ¶
func NewBaseMaster( ctx *dcontext.Context, impl MasterImpl, id frameModel.MasterID, tp frameModel.WorkerType, ) BaseMaster
NewBaseMaster creates a new DefaultBaseMaster instance
type BaseWorker ¶
type BaseWorker interface { Worker // MetaKVClient return business metastore kv client with job-level isolation MetaKVClient() metaModel.KVClient // MetricFactory return a promethus factory with some underlying labels(e.g. job-id, work-id) MetricFactory() promutil.Factory // Logger return a zap logger with some underlying fields(e.g. job-id) Logger() *zap.Logger // UpdateStatus persists the status to framework metastore if worker status is changed and // sends 'status updated message' to master. UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error // SendMessage sends a message of specific topic to master in a blocking or nonblocking way SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error // OpenStorage creates a resource and return the resource handle OpenStorage( ctx context.Context, resourcePath resModel.ResourceID, opts ...broker.OpenStorageOption, ) (broker.Handle, error) // GetEnabledBucketStorage returns whether the bucket storage is enabled and // the resource type if bucket exists GetEnabledBucketStorage() (bool, resModel.ResourceType) // Exit should be called when worker (in user logic) wants to exit. // exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed Exit(ctx context.Context, exitReason ExitReason, err error, extBytes []byte) error }
BaseWorker defines the worker interface, it embeds a Worker interface and adds more utility methods TODO: decouple the BaseWorker and WorkerService(for business)
func NewBaseWorker ¶
func NewBaseWorker( ctx *dcontext.Context, impl WorkerImpl, workerID frameModel.WorkerID, masterID frameModel.MasterID, tp frameModel.WorkerType, epoch frameModel.Epoch, ) BaseWorker
NewBaseWorker creates a new BaseWorker instance
type BaseWorkerForTesting ¶
type BaseWorkerForTesting struct { *DefaultBaseWorker Broker *broker.MockBroker }
BaseWorkerForTesting mocks base worker
func MockBaseWorker ¶
func MockBaseWorker( workerID frameModel.WorkerID, masterID frameModel.MasterID, workerImpl WorkerImpl, ) *BaseWorkerForTesting
MockBaseWorker creates a mock base worker for test
type CreateWorkerOpt ¶
type CreateWorkerOpt = master.CreateWorkerOpt
CreateWorkerOpt specifies an option for creating a worker.
func CreateWorkerWithResourceRequirements ¶
func CreateWorkerWithResourceRequirements(resources ...resModel.ResourceID) CreateWorkerOpt
CreateWorkerWithResourceRequirements specifies the resource requirement of a worker.
func CreateWorkerWithSelectors ¶
func CreateWorkerWithSelectors(selectors ...*label.Selector) CreateWorkerOpt
CreateWorkerWithSelectors specifies the selectors used to dispatch the worker.
type DefaultBaseJobMaster ¶
type DefaultBaseJobMaster struct {
// contains filtered or unexported fields
}
DefaultBaseJobMaster implements BaseJobMaster interface
func (*DefaultBaseJobMaster) Close ¶
func (d *DefaultBaseJobMaster) Close(ctx context.Context) error
Close implements BaseJobMaster.Close
func (*DefaultBaseJobMaster) CreateWorker ¶
func (d *DefaultBaseJobMaster) CreateWorker( workerType frameModel.WorkerType, config WorkerConfig, opts ...CreateWorkerOpt, ) (frameModel.WorkerID, error)
CreateWorker implements BaseJobMaster.CreateWorker
func (*DefaultBaseJobMaster) CurrentEpoch ¶
func (d *DefaultBaseJobMaster) CurrentEpoch() frameModel.Epoch
CurrentEpoch implements BaseJobMaster.CurrentEpoch
func (*DefaultBaseJobMaster) Exit ¶
func (d *DefaultBaseJobMaster) Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error
Exit implements BaseJobMaster.Exit
func (*DefaultBaseJobMaster) GetEnabledBucketStorage ¶
func (d *DefaultBaseJobMaster) GetEnabledBucketStorage() (bool, resModel.ResourceType)
GetEnabledBucketStorage implements BaseJobMaster.GetEnabledBucketStorage
func (*DefaultBaseJobMaster) GetWorkers ¶
func (d *DefaultBaseJobMaster) GetWorkers() map[frameModel.WorkerID]WorkerHandle
GetWorkers implements BaseJobMaster.GetWorkers
func (*DefaultBaseJobMaster) ID ¶
func (d *DefaultBaseJobMaster) ID() runtime.RunnableID
ID delegates the ID of inner worker
func (*DefaultBaseJobMaster) Init ¶
func (d *DefaultBaseJobMaster) Init(ctx context.Context) error
Init implements BaseJobMaster.Init
func (*DefaultBaseJobMaster) IsBaseJobMaster ¶
func (d *DefaultBaseJobMaster) IsBaseJobMaster()
IsBaseJobMaster implements BaseJobMaster.IsBaseJobMaster
func (*DefaultBaseJobMaster) IsBaseJobMasterExt ¶
func (d *DefaultBaseJobMaster) IsBaseJobMasterExt()
IsBaseJobMasterExt implements BaseJobMaster.IsBaseJobMasterExt.
func (*DefaultBaseJobMaster) IsMasterReady ¶
func (d *DefaultBaseJobMaster) IsMasterReady() bool
IsMasterReady implements BaseJobMaster.IsMasterReady
func (*DefaultBaseJobMaster) Logger ¶
func (d *DefaultBaseJobMaster) Logger() *zap.Logger
Logger implements BaseJobMaster.Logger
func (*DefaultBaseJobMaster) MetaKVClient ¶
func (d *DefaultBaseJobMaster) MetaKVClient() metaModel.KVClient
MetaKVClient implements BaseJobMaster.MetaKVClient
func (*DefaultBaseJobMaster) MetricFactory ¶
func (d *DefaultBaseJobMaster) MetricFactory() promutil.Factory
MetricFactory implements BaseJobMaster.MetricFactory
func (*DefaultBaseJobMaster) NotifyExit ¶
func (d *DefaultBaseJobMaster) NotifyExit(ctx context.Context, errIn error) (retErr error)
NotifyExit implements BaseJobMaster interface
func (*DefaultBaseJobMaster) Poll ¶
func (d *DefaultBaseJobMaster) Poll(ctx context.Context) error
Poll implements BaseJobMaster.Poll
func (*DefaultBaseJobMaster) SendMessage ¶
func (d *DefaultBaseJobMaster) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error
SendMessage delegates the SendMessage or inner worker
func (*DefaultBaseJobMaster) Stop ¶
func (d *DefaultBaseJobMaster) Stop(ctx context.Context) error
Stop implements BaseJobMaster.Stop
func (*DefaultBaseJobMaster) TriggerOpenAPIInitialize ¶
func (d *DefaultBaseJobMaster) TriggerOpenAPIInitialize(apiGroup *gin.RouterGroup)
TriggerOpenAPIInitialize implements BaseJobMasterExt.TriggerOpenAPIInitialize.
func (*DefaultBaseJobMaster) UpdateJobStatus ¶
func (d *DefaultBaseJobMaster) UpdateJobStatus(ctx context.Context, status frameModel.WorkerStatus) error
UpdateJobStatus implements BaseJobMaster.UpdateJobStatus
func (*DefaultBaseJobMaster) UpdateStatus ¶
func (d *DefaultBaseJobMaster) UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error
UpdateStatus delegates the UpdateStatus of inner worker
type DefaultBaseMaster ¶
type DefaultBaseMaster struct { Impl MasterImpl // contains filtered or unexported fields }
DefaultBaseMaster implements BaseMaster interface
func MockBaseMaster ¶
func MockBaseMaster(t *testing.T, id frameModel.MasterID, masterImpl MasterImpl) *DefaultBaseMaster
MockBaseMaster returns a mock DefaultBaseMaster
func (*DefaultBaseMaster) Close ¶
func (m *DefaultBaseMaster) Close(ctx context.Context) error
Close implements BaseMaster.Close
func (*DefaultBaseMaster) CreateWorker ¶
func (m *DefaultBaseMaster) CreateWorker( workerType frameModel.WorkerType, config WorkerConfig, opts ...CreateWorkerOpt, ) (frameModel.WorkerID, error)
CreateWorker implements BaseMaster.CreateWorker
func (*DefaultBaseMaster) DeleteProjectInfo ¶
func (m *DefaultBaseMaster) DeleteProjectInfo(workerID frameModel.WorkerID)
DeleteProjectInfo delete the project info of specific worker NOTICEL Only used by JobMananger when stop job
func (*DefaultBaseMaster) Exit ¶
func (m *DefaultBaseMaster) Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error
Exit implements BaseMaster.Exit NOTE: Currently, no implement has used this method, but we still keep it to make the interface intact
func (*DefaultBaseMaster) GetProjectInfo ¶
func (m *DefaultBaseMaster) GetProjectInfo(masterID frameModel.MasterID) tenant.ProjectInfo
GetProjectInfo get the project info of the worker [WARN]: Once 'DeleteProjectInfo' is called, 'GetProjectInfo' may return unexpected project info For JobManager: It will set the <jobID, projectInfo> pair in advance. So if we call 'GetProjectInfo' before 'DeleteProjectInfo', we can expect a correct projectInfo. For JobMaster: Master and worker always have the same projectInfo and workerProjectMap is empty
func (*DefaultBaseMaster) GetWorkers ¶
func (m *DefaultBaseMaster) GetWorkers() map[frameModel.WorkerID]WorkerHandle
GetWorkers implements BaseMaster.GetWorkers
func (*DefaultBaseMaster) Init ¶
func (m *DefaultBaseMaster) Init(ctx context.Context) error
Init implements BaseMaster.Init
func (*DefaultBaseMaster) InitProjectInfosAfterRecover ¶
func (m *DefaultBaseMaster) InitProjectInfosAfterRecover(jobs []*frameModel.MasterMeta)
InitProjectInfosAfterRecover set project infos for all worker after master recover NOTICE: Only used by JobMananger when failover
func (*DefaultBaseMaster) IsMasterReady ¶
func (m *DefaultBaseMaster) IsMasterReady() bool
IsMasterReady implements BaseMaster.IsMasterReady
func (*DefaultBaseMaster) Logger ¶
func (m *DefaultBaseMaster) Logger() *zap.Logger
Logger implements BaseMaster.Logger
func (*DefaultBaseMaster) MasterID ¶
func (m *DefaultBaseMaster) MasterID() frameModel.MasterID
MasterID implements BaseMaster.MasterID
func (*DefaultBaseMaster) MasterMeta ¶
func (m *DefaultBaseMaster) MasterMeta() *frameModel.MasterMeta
MasterMeta implements BaseMaster.MasterMeta
func (*DefaultBaseMaster) MetaKVClient ¶
func (m *DefaultBaseMaster) MetaKVClient() metaModel.KVClient
MetaKVClient returns the business space metaclient
func (*DefaultBaseMaster) MetricFactory ¶
func (m *DefaultBaseMaster) MetricFactory() promutil.Factory
MetricFactory implements BaseMaster.MetricFactory
func (*DefaultBaseMaster) NotifyExit ¶
func (m *DefaultBaseMaster) NotifyExit(ctx context.Context, errIn error) error
NotifyExit implements BaseWorker.NotifyExit
func (*DefaultBaseMaster) Poll ¶
func (m *DefaultBaseMaster) Poll(ctx context.Context) error
Poll implements BaseMaster.Poll
func (*DefaultBaseMaster) PrepareWorkerConfig ¶
func (m *DefaultBaseMaster) PrepareWorkerConfig( workerType frameModel.WorkerType, config WorkerConfig, ) (rawConfig []byte, workerID frameModel.WorkerID, err error)
PrepareWorkerConfig extracts information from WorkerConfig into detail fields.
- If workerType is master type, the config is a `*MasterMeta` struct and contains pre allocated maseter ID, and json marshalled config.
- If workerType is worker type, the config is a user defined config struct, we marshal it to byte slice as returned config, and generate a random WorkerID.
func (*DefaultBaseMaster) SetProjectInfo ¶
func (m *DefaultBaseMaster) SetProjectInfo(workerID frameModel.WorkerID, projectInfo tenant.ProjectInfo)
SetProjectInfo set the project info of specific worker [NOTICE]: Only used by JobManager to set project for different job(worker for jobmanager)
type DefaultBaseWorker ¶
type DefaultBaseWorker struct { Impl WorkerImpl // contains filtered or unexported fields }
DefaultBaseWorker implements BaseWorker interface, it also embeds an Impl which implements the WorkerImpl interface and passed from business logic.
func (*DefaultBaseWorker) Close ¶
func (w *DefaultBaseWorker) Close(ctx context.Context) error
Close implements BaseWorker.Close TODO remove the return value from the signature.
func (*DefaultBaseWorker) Exit ¶
func (w *DefaultBaseWorker) Exit(ctx context.Context, exitReason ExitReason, err error, extBytes []byte) (errRet error)
Exit implements BaseWorker.Exit
func (*DefaultBaseWorker) GetEnabledBucketStorage ¶
func (w *DefaultBaseWorker) GetEnabledBucketStorage() (bool, resModel.ResourceType)
GetEnabledBucketStorage implements BaseWorker.GetEnabledBucketStorage
func (*DefaultBaseWorker) ID ¶
func (w *DefaultBaseWorker) ID() runtime.RunnableID
ID implements BaseWorker.ID
func (*DefaultBaseWorker) Init ¶
func (w *DefaultBaseWorker) Init(ctx context.Context) error
Init implements BaseWorker.Init
func (*DefaultBaseWorker) Logger ¶
func (w *DefaultBaseWorker) Logger() *zap.Logger
Logger implements BaseMaster.Logger
func (*DefaultBaseWorker) MetaKVClient ¶
func (w *DefaultBaseWorker) MetaKVClient() metaModel.KVClient
MetaKVClient implements BaseWorker.MetaKVClient
func (*DefaultBaseWorker) MetricFactory ¶
func (w *DefaultBaseWorker) MetricFactory() promutil.Factory
MetricFactory implements BaseWorker.MetricFactory
func (*DefaultBaseWorker) NotifyExit ¶
func (w *DefaultBaseWorker) NotifyExit(ctx context.Context, errIn error) (retErr error)
NotifyExit implements BaseWorker.NotifyExit
func (*DefaultBaseWorker) OpenStorage ¶
func (w *DefaultBaseWorker) OpenStorage( ctx context.Context, resourcePath resModel.ResourceID, opts ...broker.OpenStorageOption, ) (broker.Handle, error)
OpenStorage implements BaseWorker.OpenStorage
func (*DefaultBaseWorker) Poll ¶
func (w *DefaultBaseWorker) Poll(ctx context.Context) error
Poll implements BaseWorker.Poll
func (*DefaultBaseWorker) SendMessage ¶
func (w *DefaultBaseWorker) SendMessage( ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool, ) error
SendMessage implements BaseWorker.SendMessage
func (*DefaultBaseWorker) Stop ¶
func (w *DefaultBaseWorker) Stop(ctx context.Context) error
Stop implements Worker.Stop, works the same as Worker.Close
func (*DefaultBaseWorker) UpdateStatus ¶
func (w *DefaultBaseWorker) UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error
UpdateStatus updates the worker's status and tries to notify the master. The status is persisted if State or ErrorMsg has changed. Refer to (*WorkerState).HasSignificantChange.
If UpdateStatus returns without an error, then the status must have been persisted, but there is no guarantee that the master has received a notification. Note that if the master cannot handle the notifications fast enough, notifications can be lost.
type ExitReason ¶
type ExitReason int
ExitReason is the type for exit reason
func WorkerStateToExitReason ¶
func WorkerStateToExitReason(code frameModel.WorkerState) ExitReason
WorkerStateToExitReason translates WorkerState to ExitReason TODO: business logic should not sense 'WorkerState'
type JobMasterImpl ¶
type JobMasterImpl interface { MasterImpl // OnCancel is triggered when a cancel message is received. It can be // triggered multiple times. // TODO: when it returns error, framework should close this jobmaster. OnCancel(ctx context.Context) error // OnOpenAPIInitialized is called as the first callback function of the JobMasterImpl // instance, the business logic should only register the OpenAPI handler in it. // The implementation must not retain the apiGroup. // Note: this function is called before Init(). // Concurrent safety: // - this function is called as the first callback function of an JobMasterImpl // instance, and it's not concurrent with other callbacks. OnOpenAPIInitialized(apiGroup *gin.RouterGroup) // IsJobMasterImpl is an empty function used to prevent accidental implementation // of this interface. IsJobMasterImpl() }
JobMasterImpl is the implementation of a job master of dataflow engine. the implementation struct must embed the framework.BaseJobMaster interface, this interface will be initialized by the framework.
type Master ¶
type Master interface { Init(ctx context.Context) error Poll(ctx context.Context) error MasterID() frameModel.MasterID Close(ctx context.Context) error Stop(ctx context.Context) error NotifyExit(ctx context.Context, errIn error) error }
Master defines a basic interface that can run in dataflow engine runtime
type MasterFailoverReason ¶
type MasterFailoverReason struct { Code MasterFailoverReasonCode ErrorMsg string }
MasterFailoverReason contains failover reason code and error message
type MasterFailoverReasonCode ¶
type MasterFailoverReasonCode int32
MasterFailoverReasonCode is used as reason code
type MasterImpl ¶
type MasterImpl interface { // InitImpl is called at the first time the MasterImpl instance is initialized // after OnOpenAPIInitialized. When InitImpl returns without error, framework // will try to persist an internal state so further failover will call OnMasterRecovered // rather than InitImpl. // Return: // - error to let the framework call CloseImpl, and framework may retry InitImpl // later for some times. For non-retryable failure, business logic should // call Exit. // Concurrent safety: // - this function is not concurrent with other callbacks. InitImpl(ctx context.Context) error // OnMasterRecovered is called when the MasterImpl instance has failover from // error by framework. For this MasterImpl instance, it's called after OnOpenAPIInitialized. // Return: // - error to let the framework call CloseImpl. // Concurrent safety: // - this function is not concurrent with other callbacks. OnMasterRecovered(ctx context.Context) error // Tick is called on a fixed interval after MasterImpl's InitImpl or OnMasterRecovered, // business logic can do some periodic tasks here. // Return: // - error to let the framework call CloseImpl. // Concurrent safety: // - this function may be concurrently called with other callbacks except for // Tick itself, OnOpenAPIInitialized, InitImpl, OnMasterRecovered, CloseImpl, // StopImpl. Tick(ctx context.Context) error // OnWorkerDispatched is called when the asynchronized action of CreateWorker // is finished. Only after OnWorkerDispatched, OnWorkerOnline and OnWorkerStatusUpdated // of the same worker may be called. // Return: // - error to let the framework call CloseImpl. // Concurrent safety: // - this function may be concurrently called with another worker's OnWorkerXXX, // Tick, CloseImpl, StopImpl, OnCancel. OnWorkerDispatched(worker WorkerHandle, result error) error // OnWorkerOnline is called when the first heartbeat for a worker is received. // NOTE: OnWorkerOffline can appear without OnWorkerOnline // Return: // - error to let the framework call CloseImpl. // Concurrent safety: // - this function may be concurrently called with another worker's OnWorkerXXX, // Tick, CloseImpl, StopImpl, OnCancel, the same worker's OnWorkerStatusUpdated. OnWorkerOnline(worker WorkerHandle) error // OnWorkerOffline is called as the consequence of worker's Exit or heartbeat // timed out. It's the last callback function among OnWorkerXXX for a worker. // Return: // - error to let the framework call CloseImpl. // Concurrent safety: // - this function may be concurrently called with another worker's OnWorkerXXX, // Tick, CloseImpl, StopImpl, OnCancel. OnWorkerOffline(worker WorkerHandle, reason error) error // OnWorkerMessage is called when a customized message is received. OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message interface{}) error // OnWorkerStatusUpdated is called as the consequence of worker's UpdateStatus. // Return: // - error to let the framework call CloseImpl. // Concurrent safety: // - this function may be concurrently called with another worker's OnWorkerXXX, // Tick, CloseImpl, StopImpl, OnCancel, the same worker's OnWorkerOnline. OnWorkerStatusUpdated(worker WorkerHandle, newStatus *frameModel.WorkerStatus) error // CloseImpl is called as the consequence of returning error from InitImpl, // OnMasterRecovered or Tick, the Tick will be stopped after entering this function. // And framework may try to create a new masterImpl instance afterwards. // Business logic is expected to release resources here, but business developer // should be aware that when the runtime is crashed, CloseImpl has no time to // be called. // TODO: no other callbacks will be called after and concurrent with CloseImpl // Concurrent safety: // - this function may be concurrently called with OnWorkerMessage, OnCancel, // OnWorkerDispatched, OnWorkerOnline, OnWorkerOffline, OnWorkerStatusUpdated. CloseImpl(ctx context.Context) // StopImpl is called the consequence of business logic calls Exit. Tick will // be stopped after entering this function, and framework will treat this MasterImpl // as non-recoverable, // There's at most one invocation to StopImpl after Exit. If the runtime is // crashed, StopImpl has no time to be called. // Concurrent safety: // - this function may be concurrently called with OnWorkerMessage, OnCancel, // OnWorkerDispatched, OnWorkerOnline, OnWorkerOffline, OnWorkerStatusUpdated. StopImpl(ctx context.Context) }
MasterImpl defines the interface to implement a master, business logic can be added in the functions of this interface
type MessageRouter ¶
type MessageRouter struct {
// contains filtered or unexported fields
}
MessageRouter is a SPSC(single producer, single consumer) work model, since the message frequency is not high, we use a simple channel for message transit.
func NewMessageRouter ¶
func NewMessageRouter( workerID frameModel.WorkerID, pool workerpool.AsyncPool, bufferSize int, routeFn func(topic p2p.Topic, msg p2p.MessageValue) error, ) *MessageRouter
NewMessageRouter creates a new MessageRouter
func (*MessageRouter) AppendMessage ¶
func (r *MessageRouter) AppendMessage(topic p2p.Topic, msg p2p.MessageValue)
AppendMessage always appends a new message into buffer, if the message buffer is full, it will evicits the oldest message
type MockHandle ¶
type MockHandle = master.MockHandle
MockHandle is a mock for WorkerHandle. Re-exported for testing.
type MockMasterImpl ¶
type MockMasterImpl struct { mock.Mock *DefaultBaseMaster // contains filtered or unexported fields }
MockMasterImpl implements a mock MasterImpl
func NewMockMasterImpl ¶
func NewMockMasterImpl(t *testing.T, masterID, id frameModel.MasterID) *MockMasterImpl
NewMockMasterImpl creates a new MockMasterImpl instance
func (*MockMasterImpl) CloseImpl ¶
func (m *MockMasterImpl) CloseImpl(ctx context.Context)
CloseImpl implements MasterImpl.CloseImpl
func (*MockMasterImpl) GetFrameMetaClient ¶
func (m *MockMasterImpl) GetFrameMetaClient() pkgOrm.Client
GetFrameMetaClient returns the framework meta client.
func (*MockMasterImpl) InitImpl ¶
func (m *MockMasterImpl) InitImpl(ctx context.Context) error
InitImpl implements MasterImpl.InitImpl
func (*MockMasterImpl) MasterClient ¶
func (m *MockMasterImpl) MasterClient() *client.MockServerMasterClient
MasterClient returns internal server master client
func (*MockMasterImpl) OnMasterRecovered ¶
func (m *MockMasterImpl) OnMasterRecovered(ctx context.Context) error
OnMasterRecovered implements MasterImpl.OnMasterRecovered
func (*MockMasterImpl) OnWorkerDispatched ¶
func (m *MockMasterImpl) OnWorkerDispatched(worker WorkerHandle, result error) error
OnWorkerDispatched implements MasterImpl.OnWorkerDispatched
func (*MockMasterImpl) OnWorkerMessage ¶
func (m *MockMasterImpl) OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message interface{}) error
OnWorkerMessage implements MasterImpl.OnWorkerMessage
func (*MockMasterImpl) OnWorkerOffline ¶
func (m *MockMasterImpl) OnWorkerOffline(worker WorkerHandle, reason error) error
OnWorkerOffline implements MasterImpl.OnWorkerOffline
func (*MockMasterImpl) OnWorkerOnline ¶
func (m *MockMasterImpl) OnWorkerOnline(worker WorkerHandle) error
OnWorkerOnline implements MasterImpl.OnWorkerOnline
func (*MockMasterImpl) OnWorkerStatusUpdated ¶
func (m *MockMasterImpl) OnWorkerStatusUpdated(worker WorkerHandle, newStatus *frameModel.WorkerStatus) error
OnWorkerStatusUpdated implements MasterImpl.OnWorkerStatusUpdated
func (*MockMasterImpl) StopImpl ¶
func (m *MockMasterImpl) StopImpl(ctx context.Context)
StopImpl implements MasterImpl.StopImpl
func (*MockMasterImpl) Tick ¶
func (m *MockMasterImpl) Tick(ctx context.Context) error
Tick implements MasterImpl.Tick
func (*MockMasterImpl) TickCount ¶
func (m *MockMasterImpl) TickCount() int64
TickCount returns tick invoke time
type MockWorkerHandler ¶
type MockWorkerHandler struct { mock.Mock WorkerID frameModel.WorkerID }
MockWorkerHandler implements WorkerHandle, RunningHandle and TombstoneHandle interface
func (*MockWorkerHandler) CleanTombstone ¶
func (m *MockWorkerHandler) CleanTombstone(ctx context.Context) error
CleanTombstone implements TombstoneHandle.CleanTombstone
func (*MockWorkerHandler) GetTombstone ¶
func (m *MockWorkerHandler) GetTombstone() master.TombstoneHandle
GetTombstone implements WorkerHandle.GetTombstone
func (*MockWorkerHandler) ID ¶
func (m *MockWorkerHandler) ID() frameModel.WorkerID
ID implements WorkerHandle.ID
func (*MockWorkerHandler) IsTombStone ¶
func (m *MockWorkerHandler) IsTombStone() bool
IsTombStone implements WorkerHandle.IsTombStone
func (*MockWorkerHandler) SendMessage ¶
func (m *MockWorkerHandler) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error
SendMessage implements RunningHandle.SendMessage
func (*MockWorkerHandler) Status ¶
func (m *MockWorkerHandler) Status() *frameModel.WorkerStatus
Status implements WorkerHandle.Status
func (*MockWorkerHandler) Unwrap ¶
func (m *MockWorkerHandler) Unwrap() master.RunningHandle
Unwrap implements WorkerHandle.Unwrap
type Worker ¶
type Worker interface { Init(ctx context.Context) error Poll(ctx context.Context) error ID() runtime.RunnableID Close(ctx context.Context) error Stop(ctx context.Context) error NotifyExit(ctx context.Context, errIn error) error }
Worker defines an interface that provides all methods that will be used in runtime(runner container)
type WorkerHandle ¶
type WorkerHandle = master.WorkerHandle
WorkerHandle alias to master.WorkerHandle
type WorkerImpl ¶
type WorkerImpl interface { // InitImpl is called as the consequence of CreateWorker from jobmaster or failover, // business logic is expected to do initialization here. // Return: // - error to let the framework call CloseImpl. // Concurrent safety: // - this function is called as the first callback function of an WorkerImpl // instance, and it's not concurrent with other callbacks. InitImpl(ctx context.Context) error // Tick is called on a fixed interval after WorkerImpl is initialized, business // logic can do some periodic tasks here. // Return: // - error to let the framework call CloseImpl. // Concurrent safety: // - this function may be concurrently called with OnMasterMessage. Tick(ctx context.Context) error // OnMasterMessage is called when worker receives master message, business developer // does not need to implement it. // TODO: move it out of WorkerImpl and should not be concurrent with CloseImpl. OnMasterMessage(ctx context.Context, topic p2p.Topic, message p2p.MessageValue) error // CloseImpl is called as the consequence of returning error from InitImpl or // Tick, the Tick will be stopped after entering this function. Business logic // is expected to release resources here, but business developer should be aware // that when the runtime is crashed, CloseImpl has no time to be called. // CloseImpl will only be called for once. // TODO: no other callbacks will be called after CloseImpl // Concurrent safety: // - this function may be concurrently called with OnMasterMessage. CloseImpl(ctx context.Context) }
WorkerImpl is the implementation of a worker of dataflow engine. the implementation struct must embed the framework.BaseWorker interface, this interface will be initialized by the framework.