Documentation ¶
Overview ¶
Package orchestrator mainly implements a ETCD worker. A ETCD worker is used to read/write data from ETCD servers based on snapshot and data patches. Here is a detailed description of how the ETCD worker works:
ETCD Servers | ^ | | 1. Watch | | 5. Txn | | v | EtcdWorker | ^ | | 2. Update| | 4. DataPatch +--------+ +-------+ | | | | v 3.Tick | ReactorState ----------> Reactor 1. EtcdWorker watches the txn modification log from ETCD servers 2. EtcdWorker updates the txn modification listened from ETCD servers by calling the Update function of ReactorState 3. EtcdWorker calls the Tick function of Reactor, and EtcdWorker make sure the state of ReactorState is a consistent snapshot of ETCD servers 4. Reactor is implemented by the upper layer application. Usually, Reactor will produce DataPatches when the Tick function called EtcdWorker apply all the DataPatches produced by Reactor 5. EtcdWorker commits a txn to ETCD according to DataPatches
The upper layer application which is a user of EtcdWorker only need to implement Reactor and ReactorState interface. The ReactorState is used to maintenance status of ETCD, and the Reactor can produce DataPatches differently according to the ReactorState. The EtcdWorker make sure any ReactorState which perceived by Reactor must be a consistent snapshot of ETCD servers.
Index ¶
- func InitMetrics(registry *prometheus.Registry)
- type ChangefeedReactorState
- func (s *ChangefeedReactorState) Active(captureID model.CaptureID) bool
- func (s *ChangefeedReactorState) CheckCaptureAlive(captureID model.CaptureID)
- func (s *ChangefeedReactorState) CheckChangefeedNormal()
- func (s *ChangefeedReactorState) CleanUpTaskPositions()
- func (s *ChangefeedReactorState) Exist() bool
- func (s *ChangefeedReactorState) GetChangefeedInfo() *model.ChangeFeedInfo
- func (s *ChangefeedReactorState) GetChangefeedStatus() *model.ChangeFeedStatus
- func (s *ChangefeedReactorState) GetID() model.ChangeFeedID
- func (s *ChangefeedReactorState) GetPatches() [][]DataPatch
- func (s *ChangefeedReactorState) PatchInfo(fn func(*model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error))
- func (s *ChangefeedReactorState) PatchStatus(fn func(*model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error))
- func (s *ChangefeedReactorState) PatchTaskPosition(captureID model.CaptureID, ...)
- func (s *ChangefeedReactorState) RemoveChangefeed()
- func (s *ChangefeedReactorState) ResumeChangefeed(overwriteCheckpointTs uint64)
- func (s *ChangefeedReactorState) SetError(lastError *model.RunningError)
- func (s *ChangefeedReactorState) SetWarning(lastError *model.RunningError)
- func (s *ChangefeedReactorState) TakeProcessorErrors() []*model.RunningError
- func (s *ChangefeedReactorState) TakeProcessorWarnings() []*model.RunningError
- func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error
- func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) error
- func (s *ChangefeedReactorState) UpdateChangefeedState(feedState model.FeedState, adminJobType model.AdminJobType, epoch uint64)
- func (s *ChangefeedReactorState) UpdatePendingChange()
- type DataPatch
- type EtcdWorker
- type GlobalReactorState
- func (s *GlobalReactorState) GetPatches() [][]DataPatch
- func (s *GlobalReactorState) SetOnCaptureAdded(f func(captureID model.CaptureID, addr string))
- func (s *GlobalReactorState) SetOnCaptureRemoved(f func(captureID model.CaptureID))
- func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) error
- func (s *GlobalReactorState) UpdatePendingChange()
- type Reactor
- type ReactorState
- type ReactorStateTester
- type SingleDataPatch
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics in this file
Types ¶
type ChangefeedReactorState ¶
type ChangefeedReactorState struct { ClusterID string ID model.ChangeFeedID Info *model.ChangeFeedInfo Status *model.ChangeFeedStatus TaskPositions map[model.CaptureID]*model.TaskPosition // contains filtered or unexported fields }
ChangefeedReactorState represents a changefeed state which stores all key-value pairs of a changefeed in ETCD
func NewChangefeedReactorState ¶
func NewChangefeedReactorState(clusterID string, id model.ChangeFeedID, ) *ChangefeedReactorState
NewChangefeedReactorState creates a new changefeed reactor state
func (*ChangefeedReactorState) Active ¶
func (s *ChangefeedReactorState) Active(captureID model.CaptureID) bool
Active return true if the changefeed is ready to be processed
func (*ChangefeedReactorState) CheckCaptureAlive ¶
func (s *ChangefeedReactorState) CheckCaptureAlive(captureID model.CaptureID)
CheckCaptureAlive checks if the capture is alive, if the capture offline, the etcd worker will exit and throw the ErrLeaseExpired error.
func (*ChangefeedReactorState) CheckChangefeedNormal ¶
func (s *ChangefeedReactorState) CheckChangefeedNormal()
CheckChangefeedNormal checks if the changefeed state is runnable, if the changefeed status is not runnable, the etcd worker will skip all patch of this tick the processor should call this function every tick to make sure the changefeed is runnable
func (*ChangefeedReactorState) CleanUpTaskPositions ¶
func (s *ChangefeedReactorState) CleanUpTaskPositions()
CleanUpTaskPositions removes the task positions of the changefeed.
func (*ChangefeedReactorState) Exist ¶
func (s *ChangefeedReactorState) Exist() bool
Exist returns false if all keys of this changefeed in ETCD is not exist
func (*ChangefeedReactorState) GetChangefeedInfo ¶
func (s *ChangefeedReactorState) GetChangefeedInfo() *model.ChangeFeedInfo
GetChangefeedInfo returns the changefeed info.
func (*ChangefeedReactorState) GetChangefeedStatus ¶
func (s *ChangefeedReactorState) GetChangefeedStatus() *model.ChangeFeedStatus
GetChangefeedStatus returns the changefeed status.
func (*ChangefeedReactorState) GetID ¶
func (s *ChangefeedReactorState) GetID() model.ChangeFeedID
GetID returns the changefeed ID.
func (*ChangefeedReactorState) GetPatches ¶
func (s *ChangefeedReactorState) GetPatches() [][]DataPatch
GetPatches implements the ReactorState interface
func (*ChangefeedReactorState) PatchInfo ¶
func (s *ChangefeedReactorState) PatchInfo(fn func(*model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error))
PatchInfo appends a DataPatch which can modify the ChangeFeedInfo
func (*ChangefeedReactorState) PatchStatus ¶
func (s *ChangefeedReactorState) PatchStatus(fn func(*model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error))
PatchStatus appends a DataPatch which can modify the ChangeFeedStatus
func (*ChangefeedReactorState) PatchTaskPosition ¶
func (s *ChangefeedReactorState) PatchTaskPosition(captureID model.CaptureID, fn func(*model.TaskPosition) (*model.TaskPosition, bool, error))
PatchTaskPosition appends a DataPatch which can modify the TaskPosition of a specified capture
func (*ChangefeedReactorState) RemoveChangefeed ¶
func (s *ChangefeedReactorState) RemoveChangefeed()
RemoveChangefeed removes the changefeed and clean the information and status.
func (*ChangefeedReactorState) ResumeChangefeed ¶
func (s *ChangefeedReactorState) ResumeChangefeed(overwriteCheckpointTs uint64)
ResumeChangefeed resumes the changefeed and set the checkpoint ts.
func (*ChangefeedReactorState) SetError ¶
func (s *ChangefeedReactorState) SetError(lastError *model.RunningError)
SetError sets the error to changefeed
func (*ChangefeedReactorState) SetWarning ¶
func (s *ChangefeedReactorState) SetWarning(lastError *model.RunningError)
SetWarning sets the warning to changefeed
func (*ChangefeedReactorState) TakeProcessorErrors ¶
func (s *ChangefeedReactorState) TakeProcessorErrors() []*model.RunningError
TakeProcessorErrors reuturns the error of the changefeed and clean the error.
func (*ChangefeedReactorState) TakeProcessorWarnings ¶
func (s *ChangefeedReactorState) TakeProcessorWarnings() []*model.RunningError
TakeProcessorWarnings reuturns the warning of the changefeed and clean the warning.
func (*ChangefeedReactorState) UpdateCDCKey ¶
func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) error
UpdateCDCKey updates the state by a parsed etcd key
func (*ChangefeedReactorState) UpdateChangefeedState ¶
func (s *ChangefeedReactorState) UpdateChangefeedState(feedState model.FeedState, adminJobType model.AdminJobType, epoch uint64, )
UpdateChangefeedState returns the task status of the changefeed.
func (*ChangefeedReactorState) UpdatePendingChange ¶
func (s *ChangefeedReactorState) UpdatePendingChange()
UpdatePendingChange implements the ReactorState interface
type DataPatch ¶
type DataPatch interface {
Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error
}
DataPatch represents an update of state
type EtcdWorker ¶
type EtcdWorker struct {
// contains filtered or unexported fields
}
EtcdWorker handles all interactions with Etcd
func NewEtcdWorker ¶
func NewEtcdWorker( client etcd.CDCEtcdClient, prefix string, reactor Reactor, initState ReactorState, migrator migrate.Migrator, ) (*EtcdWorker, error)
NewEtcdWorker returns a new EtcdWorker
func (*EtcdWorker) Run ¶
func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, role string) error
Run starts the EtcdWorker event loop. A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event. If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone. And the specified etcd session is nil-safety.
type GlobalReactorState ¶
type GlobalReactorState struct { ClusterID string Role string Owner map[string]struct{} Captures map[model.CaptureID]*model.CaptureInfo Upstreams map[model.UpstreamID]*model.UpstreamInfo Changefeeds map[model.ChangeFeedID]*ChangefeedReactorState // contains filtered or unexported fields }
GlobalReactorState represents a global state which stores all key-value pairs in ETCD
func NewGlobalState ¶
func NewGlobalState(clusterID string, captureSessionTTL int) *GlobalReactorState
NewGlobalState creates a new global state.
func NewGlobalStateForTest ¶
func NewGlobalStateForTest(clusterID string) *GlobalReactorState
NewGlobalStateForTest creates a new global state for test.
func (*GlobalReactorState) GetPatches ¶
func (s *GlobalReactorState) GetPatches() [][]DataPatch
GetPatches implements the ReactorState interface Every []DataPatch slice in [][]DataPatch slice is the patches of a ChangefeedReactorState
func (*GlobalReactorState) SetOnCaptureAdded ¶
func (s *GlobalReactorState) SetOnCaptureAdded(f func(captureID model.CaptureID, addr string))
SetOnCaptureAdded registers a function that is called when a capture goes online.
func (*GlobalReactorState) SetOnCaptureRemoved ¶
func (s *GlobalReactorState) SetOnCaptureRemoved(f func(captureID model.CaptureID))
SetOnCaptureRemoved registers a function that is called when a capture goes offline.
func (*GlobalReactorState) UpdatePendingChange ¶
func (s *GlobalReactorState) UpdatePendingChange()
UpdatePendingChange implements the ReactorState interface
type Reactor ¶
type Reactor interface {
Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error)
}
Reactor is a stateful transform of states. It models Owner and Processor, which reacts according to updates in Etcd.
type ReactorState ¶
type ReactorState interface { // Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state. Update(key util.EtcdKey, value []byte, isInit bool) error // UpdatePendingChange is called by EtcdWorker to notify the Reactor to apply the pending changes. UpdatePendingChange() // GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes // that a Reactor wants to apply to Etcd. // a slice of DataPatch will be committed as one ETCD txn GetPatches() [][]DataPatch }
ReactorState models the Etcd state of a reactor
type ReactorStateTester ¶
type ReactorStateTester struct {
// contains filtered or unexported fields
}
ReactorStateTester is a helper struct for unit-testing an implementer of ReactorState
func NewReactorStateTester ¶
func NewReactorStateTester(t *testing.T, state ReactorState, initKVEntries map[string]string) *ReactorStateTester
NewReactorStateTester creates a new ReactorStateTester
func (*ReactorStateTester) ApplyPatches ¶
func (r *ReactorStateTester) ApplyPatches() error
ApplyPatches calls the GetPatches method on the ReactorState and apply the changes to the mocked kv-store.
func (*ReactorStateTester) KVEntries ¶
func (r *ReactorStateTester) KVEntries() map[string]string
KVEntries returns the contents of the mocked KV store.
func (*ReactorStateTester) MustApplyPatches ¶
func (r *ReactorStateTester) MustApplyPatches()
MustApplyPatches calls ApplyPatches and must successfully
func (*ReactorStateTester) MustUpdate ¶
func (r *ReactorStateTester) MustUpdate(key string, value []byte)
MustUpdate calls Update and must successfully
type SingleDataPatch ¶
type SingleDataPatch struct { Key util.EtcdKey // Func should be a pure function that returns a new value given the old value. // The function is called each time the EtcdWorker initiates an Etcd transaction. Func func(old []byte) (newValue []byte, changed bool, err error) }
SingleDataPatch represents an update to a given Etcd key