orchestrator

package
v0.0.0-...-1dd87cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2025 License: Apache-2.0 Imports: 27 Imported by: 6

Documentation

Overview

Package orchestrator mainly implements a ETCD worker. A ETCD worker is used to read/write data from ETCD servers based on snapshot and data patches. Here is a detailed description of how the ETCD worker works:

			   ETCD Servers
				|       ^
				|       |
	   1. Watch |       | 5. Txn
				|       |
				v       |
			    EtcdWorker
				|       ^
				|       |
	   2. Update|       | 4. DataPatch
	   +--------+       +-------+
	   |                        |
	   |                        |
	   v         3.Tick         |
	ReactorState ----------> Reactor

 1. EtcdWorker watches the txn modification log from ETCD servers
 2. EtcdWorker updates the txn modification listened from ETCD servers by calling the Update function of ReactorState
 3. EtcdWorker calls the Tick function of Reactor, and EtcdWorker make sure the state of ReactorState is a consistent snapshot of ETCD servers
 4. Reactor is implemented by the upper layer application. Usually, Reactor will produce DataPatches when the Tick function called
    EtcdWorker apply all the DataPatches produced by Reactor
 5. EtcdWorker commits a txn to ETCD according to DataPatches

The upper layer application which is a user of EtcdWorker only need to implement Reactor and ReactorState interface. The ReactorState is used to maintenance status of ETCD, and the Reactor can produce DataPatches differently according to the ReactorState. The EtcdWorker make sure any ReactorState which perceived by Reactor must be a consistent snapshot of ETCD servers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics in this file

Types

type ChangefeedReactorState

type ChangefeedReactorState struct {
	ClusterID     string
	ID            model.ChangeFeedID
	Info          *model.ChangeFeedInfo
	Status        *model.ChangeFeedStatus
	TaskPositions map[model.CaptureID]*model.TaskPosition
	// contains filtered or unexported fields
}

ChangefeedReactorState represents a changefeed state which stores all key-value pairs of a changefeed in ETCD

func NewChangefeedReactorState

func NewChangefeedReactorState(clusterID string,
	id model.ChangeFeedID,
) *ChangefeedReactorState

NewChangefeedReactorState creates a new changefeed reactor state

func (*ChangefeedReactorState) Active

func (s *ChangefeedReactorState) Active(captureID model.CaptureID) bool

Active return true if the changefeed is ready to be processed

func (*ChangefeedReactorState) CheckCaptureAlive

func (s *ChangefeedReactorState) CheckCaptureAlive(captureID model.CaptureID)

CheckCaptureAlive checks if the capture is alive, if the capture offline, the etcd worker will exit and throw the ErrLeaseExpired error.

func (*ChangefeedReactorState) CheckChangefeedNormal

func (s *ChangefeedReactorState) CheckChangefeedNormal()

CheckChangefeedNormal checks if the changefeed state is runnable, if the changefeed status is not runnable, the etcd worker will skip all patch of this tick the processor should call this function every tick to make sure the changefeed is runnable

func (*ChangefeedReactorState) CleanUpTaskPositions

func (s *ChangefeedReactorState) CleanUpTaskPositions()

CleanUpTaskPositions removes the task positions of the changefeed.

func (*ChangefeedReactorState) Exist

func (s *ChangefeedReactorState) Exist() bool

Exist returns false if all keys of this changefeed in ETCD is not exist

func (*ChangefeedReactorState) GetChangefeedInfo

func (s *ChangefeedReactorState) GetChangefeedInfo() *model.ChangeFeedInfo

GetChangefeedInfo returns the changefeed info.

func (*ChangefeedReactorState) GetChangefeedStatus

func (s *ChangefeedReactorState) GetChangefeedStatus() *model.ChangeFeedStatus

GetChangefeedStatus returns the changefeed status.

func (*ChangefeedReactorState) GetID

GetID returns the changefeed ID.

func (*ChangefeedReactorState) GetPatches

func (s *ChangefeedReactorState) GetPatches() [][]DataPatch

GetPatches implements the ReactorState interface

func (*ChangefeedReactorState) PatchInfo

PatchInfo appends a DataPatch which can modify the ChangeFeedInfo

func (*ChangefeedReactorState) PatchStatus

PatchStatus appends a DataPatch which can modify the ChangeFeedStatus

func (*ChangefeedReactorState) PatchTaskPosition

func (s *ChangefeedReactorState) PatchTaskPosition(captureID model.CaptureID, fn func(*model.TaskPosition) (*model.TaskPosition, bool, error))

PatchTaskPosition appends a DataPatch which can modify the TaskPosition of a specified capture

func (*ChangefeedReactorState) RemoveChangefeed

func (s *ChangefeedReactorState) RemoveChangefeed()

RemoveChangefeed removes the changefeed and clean the information and status.

func (*ChangefeedReactorState) ResumeChangefeed

func (s *ChangefeedReactorState) ResumeChangefeed(overwriteCheckpointTs uint64)

ResumeChangefeed resumes the changefeed and set the checkpoint ts.

func (*ChangefeedReactorState) SetError

func (s *ChangefeedReactorState) SetError(lastError *model.RunningError)

SetError sets the error to changefeed

func (*ChangefeedReactorState) SetWarning

func (s *ChangefeedReactorState) SetWarning(lastError *model.RunningError)

SetWarning sets the warning to changefeed

func (*ChangefeedReactorState) TakeProcessorErrors

func (s *ChangefeedReactorState) TakeProcessorErrors() []*model.RunningError

TakeProcessorErrors reuturns the error of the changefeed and clean the error.

func (*ChangefeedReactorState) TakeProcessorWarnings

func (s *ChangefeedReactorState) TakeProcessorWarnings() []*model.RunningError

TakeProcessorWarnings reuturns the warning of the changefeed and clean the warning.

func (*ChangefeedReactorState) Update

func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error

Update implements the ReactorState interface

func (*ChangefeedReactorState) UpdateCDCKey

func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) error

UpdateCDCKey updates the state by a parsed etcd key

func (*ChangefeedReactorState) UpdateChangefeedState

func (s *ChangefeedReactorState) UpdateChangefeedState(feedState model.FeedState,
	adminJobType model.AdminJobType,
	epoch uint64,
)

UpdateChangefeedState returns the task status of the changefeed.

func (*ChangefeedReactorState) UpdatePendingChange

func (s *ChangefeedReactorState) UpdatePendingChange()

UpdatePendingChange implements the ReactorState interface

type DataPatch

type DataPatch interface {
	Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error
}

DataPatch represents an update of state

type EtcdWorker

type EtcdWorker struct {
	// contains filtered or unexported fields
}

EtcdWorker handles all interactions with Etcd

func NewEtcdWorker

func NewEtcdWorker(
	client etcd.CDCEtcdClient,
	prefix string,
	reactor Reactor,
	initState ReactorState,
	migrator migrate.Migrator,
) (*EtcdWorker, error)

NewEtcdWorker returns a new EtcdWorker

func (*EtcdWorker) Run

func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, role string) error

Run starts the EtcdWorker event loop. A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event. If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone. And the specified etcd session is nil-safety.

type GlobalReactorState

type GlobalReactorState struct {
	ClusterID   string
	Role        string
	Owner       map[string]struct{}
	Captures    map[model.CaptureID]*model.CaptureInfo
	Upstreams   map[model.UpstreamID]*model.UpstreamInfo
	Changefeeds map[model.ChangeFeedID]*ChangefeedReactorState
	// contains filtered or unexported fields
}

GlobalReactorState represents a global state which stores all key-value pairs in ETCD

func NewGlobalState

func NewGlobalState(clusterID string, captureSessionTTL int) *GlobalReactorState

NewGlobalState creates a new global state.

func NewGlobalStateForTest

func NewGlobalStateForTest(clusterID string) *GlobalReactorState

NewGlobalStateForTest creates a new global state for test.

func (*GlobalReactorState) GetPatches

func (s *GlobalReactorState) GetPatches() [][]DataPatch

GetPatches implements the ReactorState interface Every []DataPatch slice in [][]DataPatch slice is the patches of a ChangefeedReactorState

func (*GlobalReactorState) SetOnCaptureAdded

func (s *GlobalReactorState) SetOnCaptureAdded(f func(captureID model.CaptureID, addr string))

SetOnCaptureAdded registers a function that is called when a capture goes online.

func (*GlobalReactorState) SetOnCaptureRemoved

func (s *GlobalReactorState) SetOnCaptureRemoved(f func(captureID model.CaptureID))

SetOnCaptureRemoved registers a function that is called when a capture goes offline.

func (*GlobalReactorState) Update

func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) error

Update implements the ReactorState interface

func (*GlobalReactorState) UpdatePendingChange

func (s *GlobalReactorState) UpdatePendingChange()

UpdatePendingChange implements the ReactorState interface

type Reactor

type Reactor interface {
	Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error)
}

Reactor is a stateful transform of states. It models Owner and Processor, which reacts according to updates in Etcd.

type ReactorState

type ReactorState interface {
	// Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state.
	Update(key util.EtcdKey, value []byte, isInit bool) error

	// UpdatePendingChange is called by EtcdWorker to notify the Reactor to apply the pending changes.
	UpdatePendingChange()

	// GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes
	// that a Reactor wants to apply to Etcd.
	// a slice of DataPatch will be committed as one ETCD txn
	GetPatches() [][]DataPatch
}

ReactorState models the Etcd state of a reactor

type ReactorStateTester

type ReactorStateTester struct {
	// contains filtered or unexported fields
}

ReactorStateTester is a helper struct for unit-testing an implementer of ReactorState

func NewReactorStateTester

func NewReactorStateTester(t *testing.T, state ReactorState, initKVEntries map[string]string) *ReactorStateTester

NewReactorStateTester creates a new ReactorStateTester

func (*ReactorStateTester) ApplyPatches

func (r *ReactorStateTester) ApplyPatches() error

ApplyPatches calls the GetPatches method on the ReactorState and apply the changes to the mocked kv-store.

func (*ReactorStateTester) KVEntries

func (r *ReactorStateTester) KVEntries() map[string]string

KVEntries returns the contents of the mocked KV store.

func (*ReactorStateTester) MustApplyPatches

func (r *ReactorStateTester) MustApplyPatches()

MustApplyPatches calls ApplyPatches and must successfully

func (*ReactorStateTester) MustUpdate

func (r *ReactorStateTester) MustUpdate(key string, value []byte)

MustUpdate calls Update and must successfully

func (*ReactorStateTester) Update

func (r *ReactorStateTester) Update(key string, value []byte) error

Update is used to update keys in the mocked kv-store.

type SingleDataPatch

type SingleDataPatch struct {
	Key util.EtcdKey
	// Func should be a pure function that returns a new value given the old value.
	// The function is called each time the EtcdWorker initiates an Etcd transaction.
	Func func(old []byte) (newValue []byte, changed bool, err error)
}

SingleDataPatch represents an update to a given Etcd key

func (*SingleDataPatch) Patch

func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error

Patch implements the DataPatch interface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL