Documentation ¶
Overview ¶
Package orchestrator mainly implements a ETCD worker. A ETCD worker is used to read/write data from ETCD servers based on snapshot and data patches. Here is a detailed description of how the ETCD worker works:
ETCD Servers | ^ | | 1. Watch | | 5. Txn | | v | EtcdWorker | ^ | | 2. Update| | 4. DataPatch +--------+ +-------+ | | | | v 3.Tick | ReactorState ----------> Reactor 1. EtcdWorker watches the txn modification log from ETCD servers 2. EtcdWorker updates the txn modification listened from ETCD servers by calling the Update function of ReactorState 3. EtcdWorker calls the Tick function of Reactor, and EtcdWorker make sure the state of ReactorState is a consistent snapshot of ETCD servers 4. Reactor is implemented by the upper layer application. Usually, Reactor will produce DataPatches when the Tick function called EtcdWorker apply all the DataPatches produced by Reactor 5. EtcdWorker commits a txn to ETCD according to DataPatches
The upper layer application which is a user of EtcdWorker only need to implement Reactor and ReactorState interface. The ReactorState is used to maintenance status of ETCD, and the Reactor can produce DataPatches differently according to the ReactorState. The EtcdWorker make sure any ReactorState which perceived by Reactor must be a consistent snapshot of ETCD servers.
Index ¶
- func InitMetrics(registry *prometheus.Registry)
- type ChangefeedReactorState
- func (s *ChangefeedReactorState) Active(captureID model.CaptureID) bool
- func (s *ChangefeedReactorState) CheckCaptureAlive(captureID model.CaptureID)
- func (s *ChangefeedReactorState) CheckChangefeedNormal()
- func (s *ChangefeedReactorState) Exist() bool
- func (s *ChangefeedReactorState) GetPatches() [][]DataPatch
- func (s *ChangefeedReactorState) PatchInfo(fn func(*model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error))
- func (s *ChangefeedReactorState) PatchStatus(fn func(*model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error))
- func (s *ChangefeedReactorState) PatchTaskPosition(captureID model.CaptureID, ...)
- func (s *ChangefeedReactorState) PatchTaskStatus(captureID model.CaptureID, ...)
- func (s *ChangefeedReactorState) PatchTaskWorkload(captureID model.CaptureID, ...)
- func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error
- func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) error
- type DataPatch
- type EtcdWorker
- type GlobalReactorState
- func (s *GlobalReactorState) GetPatches() [][]DataPatch
- func (s *GlobalReactorState) SetOnCaptureAdded(f func(captureID model.CaptureID, addr string))
- func (s *GlobalReactorState) SetOnCaptureRemoved(f func(captureID model.CaptureID))
- func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) error
- type MultiDataPatch
- type Reactor
- type ReactorState
- type ReactorStateTester
- type SingleDataPatch
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics in this file
Types ¶
type ChangefeedReactorState ¶
type ChangefeedReactorState struct { ID model.ChangeFeedID Info *model.ChangeFeedInfo Status *model.ChangeFeedStatus TaskPositions map[model.CaptureID]*model.TaskPosition TaskStatuses map[model.CaptureID]*model.TaskStatus Workloads map[model.CaptureID]model.TaskWorkload // contains filtered or unexported fields }
ChangefeedReactorState represents a changefeed state which stores all key-value pairs of a changefeed in ETCD
func NewChangefeedReactorState ¶
func NewChangefeedReactorState(id model.ChangeFeedID) *ChangefeedReactorState
NewChangefeedReactorState creates a new changefeed reactor state
func (*ChangefeedReactorState) Active ¶
func (s *ChangefeedReactorState) Active(captureID model.CaptureID) bool
Active return true if the changefeed is ready to be processed
func (*ChangefeedReactorState) CheckCaptureAlive ¶
func (s *ChangefeedReactorState) CheckCaptureAlive(captureID model.CaptureID)
CheckCaptureAlive checks if the capture is alive, if the capture offline, the etcd worker will exit and throw the ErrLeaseExpired error.
func (*ChangefeedReactorState) CheckChangefeedNormal ¶
func (s *ChangefeedReactorState) CheckChangefeedNormal()
CheckChangefeedNormal checks if the changefeed state is runable, if the changefeed status is not runable, the etcd worker will skip all patch of this tick the processor should call this function every tick to make sure the changefeed is runable
func (*ChangefeedReactorState) Exist ¶
func (s *ChangefeedReactorState) Exist() bool
Exist returns false if all keys of this changefeed in ETCD is not exist
func (*ChangefeedReactorState) GetPatches ¶
func (s *ChangefeedReactorState) GetPatches() [][]DataPatch
GetPatches implements the ReactorState interface
func (*ChangefeedReactorState) PatchInfo ¶
func (s *ChangefeedReactorState) PatchInfo(fn func(*model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error))
PatchInfo appends a DataPatch which can modify the ChangeFeedInfo
func (*ChangefeedReactorState) PatchStatus ¶
func (s *ChangefeedReactorState) PatchStatus(fn func(*model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error))
PatchStatus appends a DataPatch which can modify the ChangeFeedStatus
func (*ChangefeedReactorState) PatchTaskPosition ¶
func (s *ChangefeedReactorState) PatchTaskPosition(captureID model.CaptureID, fn func(*model.TaskPosition) (*model.TaskPosition, bool, error))
PatchTaskPosition appends a DataPatch which can modify the TaskPosition of a specified capture
func (*ChangefeedReactorState) PatchTaskStatus ¶
func (s *ChangefeedReactorState) PatchTaskStatus(captureID model.CaptureID, fn func(*model.TaskStatus) (*model.TaskStatus, bool, error))
PatchTaskStatus appends a DataPatch which can modify the TaskStatus of a specified capture
func (*ChangefeedReactorState) PatchTaskWorkload ¶
func (s *ChangefeedReactorState) PatchTaskWorkload(captureID model.CaptureID, fn func(model.TaskWorkload) (model.TaskWorkload, bool, error))
PatchTaskWorkload appends a DataPatch which can modify the TaskWorkload of a specified capture
func (*ChangefeedReactorState) UpdateCDCKey ¶
func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) error
UpdateCDCKey updates the state by a parsed etcd key
type DataPatch ¶
type DataPatch interface {
Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error
}
DataPatch represents an update of state
type EtcdWorker ¶
type EtcdWorker struct {
// contains filtered or unexported fields
}
EtcdWorker handles all interactions with Etcd
func NewEtcdWorker ¶
func NewEtcdWorker(client *etcd.Client, prefix string, reactor Reactor, initState ReactorState) (*EtcdWorker, error)
NewEtcdWorker returns a new EtcdWorker
func (*EtcdWorker) Run ¶
func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error
Run starts the EtcdWorker event loop. A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event. If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone. And the specified etcd session is nil-safety.
type GlobalReactorState ¶
type GlobalReactorState struct { Owner map[string]struct{} Captures map[model.CaptureID]*model.CaptureInfo Changefeeds map[model.ChangeFeedID]*ChangefeedReactorState // contains filtered or unexported fields }
GlobalReactorState represents a global state which stores all key-value pairs in ETCD
func NewGlobalState ¶
func NewGlobalState() *GlobalReactorState
NewGlobalState creates a new global state
func (*GlobalReactorState) GetPatches ¶
func (s *GlobalReactorState) GetPatches() [][]DataPatch
GetPatches implements the ReactorState interface Every []DataPatch slice in [][]DataPatch slice is the patches of a ChangefeedReactorState
func (*GlobalReactorState) SetOnCaptureAdded ¶
func (s *GlobalReactorState) SetOnCaptureAdded(f func(captureID model.CaptureID, addr string))
SetOnCaptureAdded registers a function that is called when a capture goes online.
func (*GlobalReactorState) SetOnCaptureRemoved ¶
func (s *GlobalReactorState) SetOnCaptureRemoved(f func(captureID model.CaptureID))
SetOnCaptureRemoved registers a function that is called when a capture goes offline.
type MultiDataPatch ¶
type MultiDataPatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error
MultiDataPatch represents an update to many keys
type Reactor ¶
type Reactor interface {
Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error)
}
Reactor is a stateful transform of states. It models Owner and Processor, which reacts according to updates in Etcd.
type ReactorState ¶
type ReactorState interface { // Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state. Update(key util.EtcdKey, value []byte, isInit bool) error // GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes // that a Reactor wants to apply to Etcd. // a slice of DataPatch will be committed as one ETCD txn GetPatches() [][]DataPatch }
ReactorState models the Etcd state of a reactor
type ReactorStateTester ¶
type ReactorStateTester struct {
// contains filtered or unexported fields
}
ReactorStateTester is a helper struct for unit-testing an implementer of ReactorState
func NewReactorStateTester ¶
func NewReactorStateTester(c *check.C, state ReactorState, initKVEntries map[string]string) *ReactorStateTester
NewReactorStateTester creates a new ReactorStateTester
func (*ReactorStateTester) ApplyPatches ¶
func (t *ReactorStateTester) ApplyPatches() error
ApplyPatches calls the GetPatches method on the ReactorState and apply the changes to the mocked kv-store.
func (*ReactorStateTester) KVEntries ¶
func (t *ReactorStateTester) KVEntries() map[string]string
KVEntries returns the contents of the mocked KV store.
func (*ReactorStateTester) MustApplyPatches ¶
func (t *ReactorStateTester) MustApplyPatches()
MustApplyPatches calls ApplyPatches and must successfully
func (*ReactorStateTester) MustUpdate ¶
func (t *ReactorStateTester) MustUpdate(key string, value []byte)
MustUpdate calls Update and must successfully
type SingleDataPatch ¶
type SingleDataPatch struct { Key util.EtcdKey // Func should be a pure function that returns a new value given the old value. // The function is called each time the EtcdWorker initiates an Etcd transaction. Func func(old []byte) (newValue []byte, changed bool, err error) }
SingleDataPatch represents an update to a given Etcd key