etcd

package
v0.0.0-...-686f8ea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: Apache-2.0 Imports: 37 Imported by: 3

Documentation

Index

Constants

View Source
const (
	EtcdPut    = "Put"
	EtcdGet    = "Get"
	EtcdTxn    = "Txn"
	EtcdDel    = "Del"
	EtcdGrant  = "Grant"
	EtcdRevoke = "Revoke"
)

etcd operation names

View Source
const (

	// ChangefeedInfoKey is the key path for changefeed info
	ChangefeedInfoKey = "/changefeed/info"
	// ChangefeedStatusKey is the key path for changefeed status
	ChangefeedStatusKey = "/changefeed/status"

	// DeletionCounterKey is the key path for the counter of deleted keys
	DeletionCounterKey = metaPrefix + "/meta/ticdc-delete-etcd-key-count"

	// DefaultClusterAndNamespacePrefix is the default prefix of changefeed data
	DefaultClusterAndNamespacePrefix = "/tidb/cdc/default/default"
	// DefaultClusterAndMetaPrefix is the default prefix of cluster meta
	DefaultClusterAndMetaPrefix = "/tidb/cdc/default" + metaPrefix
)
View Source
const DefaultCDCClusterID = "default"

DefaultCDCClusterID is the default value of cdc cluster id

Variables

View Source
var (

	// TxnEmptyOpsElse is a no-op operation.
	TxnEmptyOpsElse = []clientV3.Op{}
)

Functions

func BaseKey

func BaseKey(clusterID string) string

BaseKey is the common prefix of the keys with cluster id in CDC

Note that tidb-dashboard depends on this prefix to filter out TiCDC keys. Ref: https://github.com/pingcap/tidb-dashboard/blob/1f39ee09c5352adbf23af8ec7e15020147ef9ca4/pkg/utils/topology/ticdc.go#L22

func CaptureInfoKeyPrefix

func CaptureInfoKeyPrefix(clusterID string) string

CaptureInfoKeyPrefix is the capture info path that is saved to etcd

func CaptureOwnerKey

func CaptureOwnerKey(clusterID string) string

CaptureOwnerKey is the capture owner path that is saved to etcd

func ChangefeedStatusKeyPrefix

func ChangefeedStatusKeyPrefix(clusterID, namespace string) string

ChangefeedStatusKeyPrefix is the prefix of changefeed status keys

func CreateRawEtcdClient

func CreateRawEtcdClient(securityConf *security.Credential, grpcDialOption grpc.DialOption, endpoints ...string) (*clientv3.Client, error)

CreateRawEtcdClient creates etcd v3 client with detecting endpoints. It will check the health of endpoints periodically, and update endpoints if needed.

func GcServiceIDForTest

func GcServiceIDForTest() string

GcServiceIDForTest returns the gc service ID for tests

func GetEtcdKeyCaptureInfo

func GetEtcdKeyCaptureInfo(clusterID, id string) string

GetEtcdKeyCaptureInfo returns the key of a capture info

func GetEtcdKeyChangeFeedInfo

func GetEtcdKeyChangeFeedInfo(clusterID string, changefeedID model.ChangeFeedID) string

GetEtcdKeyChangeFeedInfo returns the key of a changefeed config

func GetEtcdKeyChangeFeedList

func GetEtcdKeyChangeFeedList(clusterID, namespace string) string

GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config

func GetEtcdKeyJob

func GetEtcdKeyJob(clusterID string, changeFeedID model.ChangeFeedID) string

GetEtcdKeyJob returns the key for a job status

func GetEtcdKeyTaskPosition

func GetEtcdKeyTaskPosition(clusterID string,
	changefeedID model.ChangeFeedID,
	captureID string,
) string

GetEtcdKeyTaskPosition returns the key of a task position

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers the etcd request counter.

func IsHealthy

func IsHealthy(ctx context.Context, client *clientv3.Client) bool

IsHealthy checks if the etcd is healthy.

func MigrateBackupKey

func MigrateBackupKey(version int, backupKey string) string

MigrateBackupKey is the key of backup data during a migration.

func NamespacedPrefix

func NamespacedPrefix(clusterID, namespace string) string

NamespacedPrefix returns the etcd prefix of changefeed data

func SetupEmbedEtcd

func SetupEmbedEtcd(dir string) (clientURL *url.URL, e *embed.Etcd, err error)

SetupEmbedEtcd starts an embed etcd server

func TaskPositionKeyPrefix

func TaskPositionKeyPrefix(clusterID, namespace string) string

TaskPositionKeyPrefix is the prefix of task position keys

Types

type CDCEtcdClient

type CDCEtcdClient interface {
	OwnerCaptureInfoClient

	GetClusterID() string

	GetEtcdClient() *Client

	GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error)

	GetChangeFeedInfo(ctx context.Context,
		id model.ChangeFeedID,
	) (*model.ChangeFeedInfo, error)

	GetAllChangeFeedInfo(ctx context.Context) (
		map[model.ChangeFeedID]*model.ChangeFeedInfo, error,
	)

	GetChangeFeedStatus(ctx context.Context,
		id model.ChangeFeedID,
	) (*model.ChangeFeedStatus, int64, error)

	GetUpstreamInfo(ctx context.Context,
		upstreamID model.UpstreamID,
		namespace string,
	) (*model.UpstreamInfo, error)

	GetGCServiceID() string

	GetEnsureGCServiceID(tag string) string

	SaveChangeFeedInfo(ctx context.Context,
		info *model.ChangeFeedInfo,
		changeFeedID model.ChangeFeedID,
	) error

	CreateChangefeedInfo(context.Context,
		*model.UpstreamInfo,
		*model.ChangeFeedInfo,
	) error

	UpdateChangefeedAndUpstream(ctx context.Context,
		upstreamInfo *model.UpstreamInfo,
		changeFeedInfo *model.ChangeFeedInfo,
	) error

	PutCaptureInfo(context.Context, *model.CaptureInfo, clientv3.LeaseID) error

	DeleteCaptureInfo(context.Context, model.CaptureID) error

	CheckMultipleCDCClusterExist(ctx context.Context) error
}

CDCEtcdClient extracts CDCEtcdClients's method used for apiv2.

type CDCEtcdClientImpl

type CDCEtcdClientImpl struct {
	Client    *Client
	ClusterID string
	// contains filtered or unexported fields
}

CDCEtcdClientImpl is a wrap of etcd client

func NewCDCEtcdClient

func NewCDCEtcdClient(ctx context.Context,
	cli *clientv3.Client,
	clusterID string,
) (*CDCEtcdClientImpl, error)

NewCDCEtcdClient returns a new CDCEtcdClient

func (*CDCEtcdClientImpl) CheckMultipleCDCClusterExist

func (c *CDCEtcdClientImpl) CheckMultipleCDCClusterExist(ctx context.Context) error

CheckMultipleCDCClusterExist checks if other cdc clusters exists, and returns an error is so, and user should use --server instead

func (*CDCEtcdClientImpl) ClearAllCDCInfo

func (c *CDCEtcdClientImpl) ClearAllCDCInfo(ctx context.Context) error

ClearAllCDCInfo delete all keys created by CDC

func (*CDCEtcdClientImpl) Close

func (c *CDCEtcdClientImpl) Close() error

Close releases resources in CDCEtcdClient

func (*CDCEtcdClientImpl) CreateChangefeedInfo

func (c *CDCEtcdClientImpl) CreateChangefeedInfo(
	ctx context.Context, upstreamInfo *model.UpstreamInfo, info *model.ChangeFeedInfo,
) error

CreateChangefeedInfo creates a change feed info into etcd and fails if it is already exists.

func (*CDCEtcdClientImpl) DeleteCaptureInfo

func (c *CDCEtcdClientImpl) DeleteCaptureInfo(ctx context.Context, captureID string) error

DeleteCaptureInfo delete all capture related info from etcd.

func (*CDCEtcdClientImpl) DeleteChangeFeedInfo

func (c *CDCEtcdClientImpl) DeleteChangeFeedInfo(ctx context.Context,
	id model.ChangeFeedID,
) error

DeleteChangeFeedInfo deletes a changefeed config from etcd

func (*CDCEtcdClientImpl) GetAllCDCInfo

func (c *CDCEtcdClientImpl) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error)

GetAllCDCInfo get all keys created by CDC

func (*CDCEtcdClientImpl) GetAllChangeFeedInfo

func (c *CDCEtcdClientImpl) GetAllChangeFeedInfo(ctx context.Context) (
	map[model.ChangeFeedID]*model.ChangeFeedInfo, error,
)

GetAllChangeFeedInfo queries all changefeed information

func (*CDCEtcdClientImpl) GetCaptureInfo

func (c *CDCEtcdClientImpl) GetCaptureInfo(
	ctx context.Context, id string,
) (info *model.CaptureInfo, err error)

GetCaptureInfo get capture info from etcd. return ErrCaptureNotExist if the capture not exists.

func (*CDCEtcdClientImpl) GetCaptureLeases

func (c *CDCEtcdClientImpl) GetCaptureLeases(ctx context.Context) (map[string]int64, error)

GetCaptureLeases returns a map mapping from capture ID to its lease

func (*CDCEtcdClientImpl) GetCaptures

func (c *CDCEtcdClientImpl) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error)

GetCaptures returns kv revision and CaptureInfo list

func (*CDCEtcdClientImpl) GetChangeFeedInfo

func (c *CDCEtcdClientImpl) GetChangeFeedInfo(ctx context.Context,
	id model.ChangeFeedID,
) (*model.ChangeFeedInfo, error)

GetChangeFeedInfo queries the config of a given changefeed

func (*CDCEtcdClientImpl) GetChangeFeedStatus

func (c *CDCEtcdClientImpl) GetChangeFeedStatus(ctx context.Context,
	id model.ChangeFeedID,
) (*model.ChangeFeedStatus, int64, error)

GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed

func (*CDCEtcdClientImpl) GetChangeFeeds

func (c *CDCEtcdClientImpl) GetChangeFeeds(ctx context.Context) (
	int64,
	map[model.ChangeFeedID]*mvccpb.KeyValue, error,
)

GetChangeFeeds returns kv revision and a map mapping from changefeedID to changefeed detail mvccpb.KeyValue

func (*CDCEtcdClientImpl) GetClusterID

func (c *CDCEtcdClientImpl) GetClusterID() string

GetClusterID gets CDC cluster ID.

func (*CDCEtcdClientImpl) GetEnsureGCServiceID

func (c *CDCEtcdClientImpl) GetEnsureGCServiceID(tag string) string

GetEnsureGCServiceID return the prefix for the gc service id when changefeed is creating

func (*CDCEtcdClientImpl) GetEtcdClient

func (c *CDCEtcdClientImpl) GetEtcdClient() *Client

GetEtcdClient gets Client.

func (*CDCEtcdClientImpl) GetGCServiceID

func (c *CDCEtcdClientImpl) GetGCServiceID() string

GetGCServiceID returns the cdc gc service ID

func (*CDCEtcdClientImpl) GetOwnerID

func (c *CDCEtcdClientImpl) GetOwnerID(ctx context.Context) (string, error)

GetOwnerID returns the owner id by querying etcd

func (*CDCEtcdClientImpl) GetOwnerRevision

func (c *CDCEtcdClientImpl) GetOwnerRevision(
	ctx context.Context, captureID string,
) (rev int64, err error)

GetOwnerRevision gets the Etcd revision for the elected owner.

func (*CDCEtcdClientImpl) GetUpstreamInfo

func (c *CDCEtcdClientImpl) GetUpstreamInfo(ctx context.Context,
	upstreamID model.UpstreamID,
	namespace string,
) (*model.UpstreamInfo, error)

GetUpstreamInfo get a upstreamInfo from etcd server

func (*CDCEtcdClientImpl) PutCaptureInfo

func (c *CDCEtcdClientImpl) PutCaptureInfo(
	ctx context.Context, info *model.CaptureInfo, leaseID clientv3.LeaseID,
) error

PutCaptureInfo put capture info into etcd, this happens when the capture starts.

func (*CDCEtcdClientImpl) RevokeAllLeases

func (c *CDCEtcdClientImpl) RevokeAllLeases(ctx context.Context, leases map[string]int64) error

RevokeAllLeases revokes all leases passed from parameter

func (*CDCEtcdClientImpl) SaveChangeFeedInfo

func (c *CDCEtcdClientImpl) SaveChangeFeedInfo(ctx context.Context,
	info *model.ChangeFeedInfo,
	changeFeedID model.ChangeFeedID,
) error

SaveChangeFeedInfo stores change feed info into etcd TODO: this should be called from outer system, such as from a TiDB client

func (*CDCEtcdClientImpl) UpdateChangefeedAndUpstream

func (c *CDCEtcdClientImpl) UpdateChangefeedAndUpstream(
	ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo,
) error

UpdateChangefeedAndUpstream updates the changefeed's info and its upstream info into etcd

type CDCKey

type CDCKey struct {
	Tp           CDCKeyType
	ChangefeedID model.ChangeFeedID
	CaptureID    string
	OwnerLeaseID string
	ClusterID    string
	UpstreamID   model.UpstreamID
	Namespace    string
}

CDCKey represents an etcd key which is defined by TiCDC

 Usage:
 we can parse a raw etcd key:
 ```
 	k := new(CDCKey)
 	rawKey := "/tidb/cdc/changefeed/info/test/changefeed"
	err := k.Parse(rawKey)
 	c.Assert(k, check.DeepEquals, &CDCKey{
			Tp:           CDCKeyTypeChangefeedInfo,
			ChangefeedID: "test/changefeed",
	})
 ```

 and we can generate a raw key from CDCKey
 ```
 	k := &CDCKey{
			Tp:           CDCKeyTypeChangefeedInfo,
			ChangefeedID: "test/changefeed",
	}
 	c.Assert(k.String(), check.Equals, "/tidb/cdc/changefeed/info/test/changefeed")
 ```

func (*CDCKey) Parse

func (k *CDCKey) Parse(clusterID, key string) error

Parse parses the given etcd key

func (*CDCKey) String

func (k *CDCKey) String() string

type CDCKeyType

type CDCKeyType = int

CDCKeyType is the type of etcd key

const (
	CDCKeyTypeUnknown CDCKeyType = iota
	CDCKeyTypeOwner
	CDCKeyTypeCapture
	CDCKeyTypeChangefeedInfo
	CDCKeyTypeChangeFeedStatus
	CDCKeyTypeTaskPosition
	CDCKeyTypeMetaVersion
	CDCKeyTypeUpStream
)

the types of etcd key

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a simple wrapper that adds retry to etcd RPC

func Wrap

func Wrap(cli *clientV3.Client, metrics map[string]prometheus.Counter) *Client

Wrap warps a clientV3.Client that provides etcd APIs required by TiCDC.

func (*Client) Delete

func (c *Client) Delete(
	ctx context.Context, key string, opts ...clientV3.OpOption,
) (resp *clientV3.DeleteResponse, err error)

Delete delegates request to clientV3.KV.Delete

func (*Client) Get

func (c *Client) Get(
	ctx context.Context, key string, opts ...clientV3.OpOption,
) (resp *clientV3.GetResponse, err error)

Get delegates request to clientV3.KV.Get

func (*Client) Grant

func (c *Client) Grant(
	ctx context.Context, ttl int64,
) (resp *clientV3.LeaseGrantResponse, err error)

Grant delegates request to clientV3.Lease.Grant

func (*Client) Put

func (c *Client) Put(
	ctx context.Context, key, val string, opts ...clientV3.OpOption,
) (resp *clientV3.PutResponse, err error)

Put delegates request to clientV3.KV.Put

func (*Client) RequestProgress

func (c *Client) RequestProgress(ctx context.Context) error

RequestProgress requests a progress notify response be sent in all watch channels.

func (*Client) Revoke

func (c *Client) Revoke(
	ctx context.Context, id clientV3.LeaseID,
) (resp *clientV3.LeaseRevokeResponse, err error)

Revoke delegates request to clientV3.Lease.Revoke

func (*Client) TimeToLive

func (c *Client) TimeToLive(
	ctx context.Context, lease clientV3.LeaseID, opts ...clientV3.LeaseOption,
) (resp *clientV3.LeaseTimeToLiveResponse, err error)

TimeToLive delegates request to clientV3.Lease.TimeToLive

func (*Client) Txn

func (c *Client) Txn(
	ctx context.Context, cmps []clientV3.Cmp, opsThen, opsElse []clientV3.Op,
) (resp *clientV3.TxnResponse, err error)

Txn delegates request to clientV3.KV.Txn. The error returned can only be a non-retryable error, such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.

func (*Client) Unwrap

func (c *Client) Unwrap() *clientV3.Client

Unwrap returns a clientV3.Client

func (*Client) Watch

func (c *Client) Watch(
	ctx context.Context, key string, role string, opts ...clientV3.OpOption,
) clientV3.WatchChan

Watch delegates request to clientV3.Watcher.Watch

func (*Client) WatchWithChan

func (c *Client) WatchWithChan(
	ctx context.Context, outCh chan<- clientV3.WatchResponse,
	key string, role string, opts ...clientV3.OpOption,
)

WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh

type OwnerCaptureInfoClient

type OwnerCaptureInfoClient interface {
	GetOwnerID(context.Context) (model.CaptureID, error)

	GetOwnerRevision(context.Context, model.CaptureID) (int64, error)

	GetCaptures(context.Context) (int64, []*model.CaptureInfo, error)
}

OwnerCaptureInfoClient is the sub interface of CDCEtcdClient that used for get owner capture information

type Tester

type Tester struct {
	ClientURL *url.URL
	// contains filtered or unexported fields
}

Tester is for ut tests

func (*Tester) SetUpTest

func (s *Tester) SetUpTest(t *testing.T)

SetUpTest setup etcd tester

func (*Tester) TearDownTest

func (s *Tester) TearDownTest(t *testing.T)

TearDownTest teardown etcd

Directories

Path Synopsis
Package mock_etcd is a generated GoMock package.
Package mock_etcd is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL