Documentation ¶
Index ¶
- Constants
- Variables
- func HeartbeatPingTopic(masterID MasterID) p2p.Topic
- func HeartbeatPongTopic(masterID MasterID, workerID WorkerID) p2p.Topic
- func WorkerStatusChangeRequestTopic(masterID MasterID, workerID WorkerID) p2p.Topic
- type Epoch
- type HeartbeatPingMessage
- type HeartbeatPongMessage
- type MasterID
- type MasterMeta
- func (m *MasterMeta) ExitValues() ormModel.KeyValueMap
- func (m *MasterMeta) Marshal() ([]byte, error)
- func (m *MasterMeta) RefreshValues() ormModel.KeyValueMap
- func (m *MasterMeta) Unmarshal(data []byte) error
- func (m *MasterMeta) UpdateErrorValues() ormModel.KeyValueMap
- func (m *MasterMeta) UpdateStateValues() ormModel.KeyValueMap
- type MasterMetaExt
- type MasterState
- type StatusChangeRequest
- type WorkerID
- type WorkerState
- type WorkerStatus
- type WorkerType
Constants ¶
const ( MasterStateUninit = MasterState(1) MasterStateInit = MasterState(2) MasterStateFinished = MasterState(3) MasterStateStopped = MasterState(4) MasterStateFailed = MasterState(5) )
Job master statuses NOTICE: DO NOT CHANGE the previous status code Modify the MasterMeta.State comment IF you add some new status code
const ( JobManager = WorkerType(iota + 1) // job master CvsJobMaster FakeJobMaster DMJobMaster CdcJobMaster // task CvsTask FakeTask DmTask CdcTask // worker WorkerDMDump WorkerDMLoad WorkerDMSync )
Defines all task type TODO: Refine me.Currently, when adding a new worker type or job type, we need to modify many code places, NOTICE: DO NOT CHANGE the previous worker type Modify the comment in model IF you add some new worker type
const ( WorkerStateNormal = WorkerState(1) WorkerStateCreated = WorkerState(2) WorkerStateInit = WorkerState(3) WorkerStateError = WorkerState(4) WorkerStateFinished = WorkerState(5) WorkerStateStopped = WorkerState(6) )
Among these statuses, only WorkerStateCreated is used by the framework for now. The rest are for the business logic to use. TODO think about whether to manage the transition of the statuses. TODO: need a FSM graph NOTICE: DO NOT CHANGE the previous status code Modify the WorkerStatus.State comment IF you add some new status code
Variables ¶
var MasterUpdateColumns = []string{
"updated_at",
"project_id",
"id",
"type",
"state",
"node_id",
"address",
"epoch",
"config",
"error_message",
"detail",
"ext",
}
MasterUpdateColumns is used in gorm update TODO: using reflect to generate it more generally related to some implement of gorm
var WorkerUpdateColumns = []string{
"updated_at",
"project_id",
"job_id",
"id",
"type",
"state",
"epoch",
"error_message",
"extend_bytes",
}
WorkerUpdateColumns is used in gorm update. TODO: using reflect to generate it more generally related to some implement of gorm
Functions ¶
func HeartbeatPingTopic ¶
HeartbeatPingTopic is heartbeat ping message topic, each master has a unique one.
func HeartbeatPongTopic ¶
HeartbeatPongTopic is heartbeat pong message topic, each worker has a unique one.
Types ¶
type HeartbeatPingMessage ¶
type HeartbeatPingMessage struct { SendTime clock.MonotonicTime `json:"send-time"` FromWorkerID WorkerID `json:"from-worker-id"` Epoch Epoch `json:"epoch"` WorkerEpoch Epoch `json:"worker-epoch"` IsFinished bool `json:"is-finished"` }
HeartbeatPingMessage ships information in heartbeat ping
type HeartbeatPongMessage ¶
type HeartbeatPongMessage struct { SendTime clock.MonotonicTime `json:"send-time"` ReplyTime time.Time `json:"reply-time"` ToWorkerID WorkerID `json:"to-worker-id"` Epoch Epoch `json:"epoch"` IsFinished bool `json:"is-finished"` }
HeartbeatPongMessage ships information in heartbeat pong
type MasterID ¶
type MasterID = string
MasterID is master id in master worker framework. - It is job manager id when master is job manager and worker is job master. - It is job master id when master is job master and worker is worker.
type MasterMeta ¶
type MasterMeta struct { ormModel.Model ProjectID tenant.ProjectID `json:"project-id" gorm:"column:project_id;type:varchar(128) not null;index:idx_mst,priority:1"` ID MasterID `json:"id" gorm:"column:id;type:varchar(128) not null;uniqueIndex:uidx_mid"` Type WorkerType `` /* 139-byte string literal not displayed */ State MasterState `` /* 138-byte string literal not displayed */ NodeID p2p.NodeID `json:"node-id" gorm:"column:node_id;type:varchar(128) not null"` Addr string `json:"addr" gorm:"column:address;type:varchar(256) not null"` Epoch Epoch `json:"epoch" gorm:"column:epoch;type:bigint not null"` // Config holds business-specific data Config []byte `json:"config" gorm:"column:config;type:blob"` // error message for the job ErrorMsg string `json:"error-message" gorm:"column:error_message;type:text"` // if job is finished or canceled, business logic can set self-defined job info to `Detail` Detail []byte `json:"detail" gorm:"column:detail;type:blob"` Ext MasterMetaExt `json:"ext" gorm:"column:ext;type:JSON"` // Deleted is a nullable timestamp. Then master is deleted // if Deleted is not null. Deleted gorm.DeletedAt }
MasterMeta defines the metadata of job master
func (*MasterMeta) ExitValues ¶
func (m *MasterMeta) ExitValues() ormModel.KeyValueMap
ExitValues is used to generate orm value map when job master exits.
func (*MasterMeta) Marshal ¶
func (m *MasterMeta) Marshal() ([]byte, error)
Marshal returns the JSON encoding of MasterMeta.
func (*MasterMeta) RefreshValues ¶
func (m *MasterMeta) RefreshValues() ormModel.KeyValueMap
RefreshValues is used to generate orm value map when refreshing metadata.
func (*MasterMeta) Unmarshal ¶
func (m *MasterMeta) Unmarshal(data []byte) error
Unmarshal parses the JSON-encoded data and stores the result to MasterMeta
func (*MasterMeta) UpdateErrorValues ¶
func (m *MasterMeta) UpdateErrorValues() ormModel.KeyValueMap
UpdateErrorValues is used to generate orm value map when job master meets error and records it.
func (*MasterMeta) UpdateStateValues ¶
func (m *MasterMeta) UpdateStateValues() ormModel.KeyValueMap
UpdateStateValues is used to generate orm value map when updating state of master meta.
type MasterMetaExt ¶
MasterMetaExt stores some attributes of job masters that do not need to be indexed.
func (*MasterMetaExt) Scan ¶
func (e *MasterMetaExt) Scan(rawInput interface{}) error
Scan implements sql.Scanner.
type MasterState ¶
type MasterState int8
MasterState is used in framework to manage job status
func (MasterState) IsTerminatedState ¶
func (code MasterState) IsTerminatedState() bool
IsTerminatedState checks whether master state is terminated
type StatusChangeRequest ¶
type StatusChangeRequest struct { SendTime clock.MonotonicTime `json:"send-time"` FromMasterID MasterID `json:"from-master-id"` Epoch Epoch `json:"epoch"` ExpectState WorkerState `json:"expect-state"` }
StatusChangeRequest ships information when updating worker status
type WorkerID ¶
type WorkerID = string
WorkerID is worker id in master worker framework. - It is job master id when master is job manager and worker is job master. - It is worker id when master is job master and worker is worker.
type WorkerState ¶
type WorkerState int8
WorkerState represents worker running status in master worker framework TODO: add fsm of WorkerState
type WorkerStatus ¶
type WorkerStatus struct { ormModel.Model ProjectID tenant.ProjectID `json:"project-id" gorm:"column:project_id;type:varchar(128) not null"` JobID MasterID `json:"job-id" gorm:"column:job_id;type:varchar(128) not null;uniqueIndex:uidx_wid,priority:1;index:idx_wst,priority:1"` ID WorkerID `json:"id" gorm:"column:id;type:varchar(128) not null;uniqueIndex:uidx_wid,priority:2"` Type WorkerType `` /* 234-byte string literal not displayed */ State WorkerState `` /* 148-byte string literal not displayed */ Epoch Epoch `json:"epoch" gorm:"column:epoch;type:bigint not null"` ErrorMsg string `json:"error-message" gorm:"column:error_message;type:text"` // ExtBytes carries the serialized form of the Ext field, which is used in // business logic only. // Business logic can parse the raw bytes and decode into business Go object ExtBytes []byte `json:"extend-bytes" gorm:"column:extend_bytes;type:blob"` }
WorkerStatus records worker information, including master id, worker id, worker type, project id(tenant), worker status(used in master worker framework), error message and ext bytes(passed from business logic) in metastore.
func (WorkerStatus) HasSignificantChange ¶
func (s WorkerStatus) HasSignificantChange(other *WorkerStatus) bool
HasSignificantChange indicates whether `s` has significant changes worth persisting.
func (*WorkerStatus) InTerminateState ¶
func (s *WorkerStatus) InTerminateState() bool
InTerminateState returns whether worker is in a terminate state, including finished, stopped, error.
func (*WorkerStatus) Map ¶
func (s *WorkerStatus) Map() map[string]interface{}
Map is used for update the orm model
func (*WorkerStatus) Marshal ¶
func (s *WorkerStatus) Marshal() ([]byte, error)
Marshal returns the JSON encoding of WorkerStatus.
func (*WorkerStatus) Unmarshal ¶
func (s *WorkerStatus) Unmarshal(bytes []byte) error
Unmarshal parses the JSON-encoded data and stores the result into a WorkerStatus
type WorkerType ¶
type WorkerType int16
WorkerType represents task type, such as DM worker, DM master, etc.
func (WorkerType) MarshalJSON ¶
func (wt WorkerType) MarshalJSON() ([]byte, error)
MarshalJSON marshals the enum as a quoted json string
func (WorkerType) String ¶
func (wt WorkerType) String() string
String implements fmt.Stringer interface
func (*WorkerType) UnmarshalJSON ¶
func (wt *WorkerType) UnmarshalJSON(b []byte) error
UnmarshalJSON unmashals a quoted json string to the enum value