etcd

package
v0.0.0-...-a345a4b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EtcdPut    = "Put"
	EtcdGet    = "Get"
	EtcdTxn    = "Txn"
	EtcdDel    = "Del"
	EtcdGrant  = "Grant"
	EtcdRevoke = "Revoke"
)

etcd operation names

View Source
const (
	// CaptureOwnerKey is the capture owner path that is saved to etcd
	CaptureOwnerKey = EtcdKeyBase + "/owner"
	// CaptureInfoKeyPrefix is the capture info path that is saved to etcd
	CaptureInfoKeyPrefix = EtcdKeyBase + "/capture"
	// TaskKeyPrefix is the prefix of task keys
	TaskKeyPrefix = EtcdKeyBase + "/task"
	// TaskStatusKeyPrefix is the prefix of task status keys
	TaskStatusKeyPrefix = TaskKeyPrefix + "/status"
	// TaskPositionKeyPrefix is the prefix of task position keys
	TaskPositionKeyPrefix = TaskKeyPrefix + "/position"
	// JobKeyPrefix is the prefix of job keys
	JobKeyPrefix = EtcdKeyBase + "/job"
)
View Source
const (
	// EtcdKeyBase is the common prefix of the keys in CDC
	EtcdKeyBase = "/tikv/cdc"
)

Variables

This section is empty.

Functions

func GetEtcdKeyCaptureInfo

func GetEtcdKeyCaptureInfo(id string) string

GetEtcdKeyCaptureInfo returns the key of a capture info

func GetEtcdKeyChangeFeedInfo

func GetEtcdKeyChangeFeedInfo(changefeedID string) string

GetEtcdKeyChangeFeedInfo returns the key of a changefeed config

func GetEtcdKeyChangeFeedList

func GetEtcdKeyChangeFeedList() string

GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config

func GetEtcdKeyJob

func GetEtcdKeyJob(changeFeedID string) string

GetEtcdKeyJob returns the key for a job status

func GetEtcdKeyTaskPosition

func GetEtcdKeyTaskPosition(changefeedID, captureID string) string

GetEtcdKeyTaskPosition returns the key of a task position

func GetEtcdKeyTaskStatus

func GetEtcdKeyTaskStatus(changeFeedID, captureID string) string

GetEtcdKeyTaskStatus returns the key for the task status

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers the etcd request counter.

func SetupEmbedEtcd

func SetupEmbedEtcd(dir string) (clientURL *url.URL, e *embed.Etcd, err error)

SetupEmbedEtcd starts an embed etcd server

Types

type CDCEtcdClient

type CDCEtcdClient struct {
	Client *Client
}

CDCEtcdClient is a wrap of etcd client

func NewCDCEtcdClient

func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client) CDCEtcdClient

NewCDCEtcdClient returns a new CDCEtcdClient

func (CDCEtcdClient) ClearAllCDCInfo

func (c CDCEtcdClient) ClearAllCDCInfo(ctx context.Context) error

ClearAllCDCInfo delete all keys created by CDC

func (CDCEtcdClient) Close

func (c CDCEtcdClient) Close() error

Close releases resources in CDCEtcdClient

func (CDCEtcdClient) CreateChangefeedInfo

func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error

CreateChangefeedInfo creates a change feed info into etcd and fails if it is already exists.

func (CDCEtcdClient) DeleteCaptureInfo

func (c CDCEtcdClient) DeleteCaptureInfo(ctx context.Context, id string) error

DeleteCaptureInfo delete capture info from etcd.

func (CDCEtcdClient) DeleteChangeFeedInfo

func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id string) error

DeleteChangeFeedInfo deletes a changefeed config from etcd

func (CDCEtcdClient) GetAllCDCInfo

func (c CDCEtcdClient) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error)

GetAllCDCInfo get all keys created by CDC

func (CDCEtcdClient) GetAllChangeFeedInfo

func (c CDCEtcdClient) GetAllChangeFeedInfo(ctx context.Context) (map[string]*model.ChangeFeedInfo, error)

GetAllChangeFeedInfo queries all changefeed information

func (CDCEtcdClient) GetAllChangeFeedStatus

func (c CDCEtcdClient) GetAllChangeFeedStatus(ctx context.Context) (map[string]*model.ChangeFeedStatus, error)

GetAllChangeFeedStatus queries all changefeed job status

func (CDCEtcdClient) GetAllTaskPositions

func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error)

GetAllTaskPositions queries all task positions of a changefeed, and returns a map mapping from captureID to TaskPositions

func (CDCEtcdClient) GetAllTaskStatus

func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error)

GetAllTaskStatus queries all task status of a changefeed, and returns a map mapping from captureID to TaskStatus

func (CDCEtcdClient) GetCaptureInfo

func (c CDCEtcdClient) GetCaptureInfo(ctx context.Context, id string) (info *model.CaptureInfo, err error)

GetCaptureInfo get capture info from etcd. return errCaptureNotExist if the capture not exists.

func (CDCEtcdClient) GetCaptureLeases

func (c CDCEtcdClient) GetCaptureLeases(ctx context.Context) (map[string]int64, error)

GetCaptureLeases returns a map mapping from capture ID to its lease

func (CDCEtcdClient) GetCaptures

func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error)

GetCaptures returns kv revision and CaptureInfo list

func (CDCEtcdClient) GetChangeFeedInfo

func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, id string) (*model.ChangeFeedInfo, error)

GetChangeFeedInfo queries the config of a given changefeed

func (CDCEtcdClient) GetChangeFeedStatus

func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error)

GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed

func (CDCEtcdClient) GetChangeFeeds

func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) (int64, map[string]*mvccpb.KeyValue, error)

GetChangeFeeds returns kv revision and a map mapping from changefeedID to changefeed detail mvccpb.KeyValue

func (CDCEtcdClient) GetOwnerID

func (c CDCEtcdClient) GetOwnerID(ctx context.Context, key string) (string, error)

GetOwnerID returns the owner id by querying etcd

func (CDCEtcdClient) GetOwnerRevision

func (c CDCEtcdClient) GetOwnerRevision(ctx context.Context, captureID string) (rev int64, err error)

GetOwnerRevision gets the Etcd revision for the elected owner.

func (CDCEtcdClient) GetProcessors

func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error)

GetProcessors queries all processors of the cdc cluster, and returns a slice of ProcInfoSnap(without table info)

func (CDCEtcdClient) GetTaskPosition

func (c CDCEtcdClient) GetTaskPosition(
	ctx context.Context,
	changefeedID string,
	captureID string,
) (int64, *model.TaskPosition, error)

GetTaskPosition queries task process from etcd, returns

  • ModRevision of the given key
  • *model.TaskPosition unmarshaled from the value
  • error if error happens

func (CDCEtcdClient) GetTaskStatus

func (c CDCEtcdClient) GetTaskStatus(
	ctx context.Context,
	changefeedID string,
	captureID string,
) (int64, *model.TaskStatus, error)

GetTaskStatus queries task status from etcd, returns

  • ModRevision of the given key
  • *model.TaskStatus unmarshalled from the value
  • error if error happens

func (CDCEtcdClient) PutCaptureInfo

func (c CDCEtcdClient) PutCaptureInfo(ctx context.Context, info *model.CaptureInfo, leaseID clientv3.LeaseID) error

PutCaptureInfo put capture info into etcd.

func (CDCEtcdClient) PutChangeFeedStatus

func (c CDCEtcdClient) PutChangeFeedStatus(
	ctx context.Context,
	changefeedID string,
	status *model.ChangeFeedStatus,
) error

PutChangeFeedStatus puts changefeed synchronization status into etcd

func (CDCEtcdClient) PutTaskPositionOnChange

func (c CDCEtcdClient) PutTaskPositionOnChange(
	ctx context.Context,
	changefeedID string,
	captureID string,
	info *model.TaskPosition,
) (bool, error)

PutTaskPositionOnChange puts task position information into etcd if the task position value changes or the presvious value does not exist in etcd. returns true if task position is written to etcd.

func (CDCEtcdClient) PutTaskStatus

func (c CDCEtcdClient) PutTaskStatus(
	ctx context.Context,
	changefeedID string,
	captureID string,
	info *model.TaskStatus,
) error

PutTaskStatus puts task status into etcd.

func (CDCEtcdClient) RevokeAllLeases

func (c CDCEtcdClient) RevokeAllLeases(ctx context.Context, leases map[string]int64) error

RevokeAllLeases revokes all leases passed from parameter

func (CDCEtcdClient) SaveChangeFeedInfo

func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error

SaveChangeFeedInfo stores change feed info into etcd TODO: this should be called from outer system, such as from a TiDB client

type CDCKey

type CDCKey struct {
	Tp           CDCKeyType
	ChangefeedID string
	CaptureID    string
	OwnerLeaseID string
}

CDCKey represents a etcd key which is defined by TiCDC

 Usage:
 we can parse a raw etcd key:
 ```
 	k := new(CDCKey)
 	rawKey := "/tikv/cdc/changefeed/info/test/changefeed"
	err := k.Parse(rawKey)
 	c.Assert(k, check.DeepEquals, &CDCKey{
			Tp:           CDCKeyTypeChangefeedInfo,
			ChangefeedID: "test/changefeed",
	})
 ```

 and we can generate a raw key from CDCKey
 ```
 	k := &CDCKey{
			Tp:           CDCKeyTypeChangefeedInfo,
			ChangefeedID: "test/changefeed",
	}
 	c.Assert(k.String(), check.Equals, "/tikv/cdc/changefeed/info/test/changefeed")
 ```

func (*CDCKey) Parse

func (k *CDCKey) Parse(key string) error

Parse parses the given etcd key

func (*CDCKey) String

func (k *CDCKey) String() string

type CDCKeyType

type CDCKeyType = int

CDCKeyType is the type of etcd key

const (
	CDCKeyTypeUnknown CDCKeyType = iota
	CDCKeyTypeOwner
	CDCKeyTypeCapture
	CDCKeyTypeChangefeedInfo
	CDCKeyTypeChangeFeedStatus
	CDCKeyTypeTaskPosition
	CDCKeyTypeTaskStatus
	CDCKeyTypeTaskWorkload
)

the types of etcd key

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a simple wrapper that adds retry to etcd RPC

func Wrap

func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client

Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC.

func (*Client) Delete

func (c *Client) Delete(
	ctx context.Context, key string, opts ...clientv3.OpOption,
) (resp *clientv3.DeleteResponse, err error)

Delete delegates request to clientv3.KV.Delete

func (*Client) Get

func (c *Client) Get(
	ctx context.Context, key string, opts ...clientv3.OpOption,
) (resp *clientv3.GetResponse, err error)

Get delegates request to clientv3.KV.Get

func (*Client) Grant

func (c *Client) Grant(
	ctx context.Context, ttl int64,
) (resp *clientv3.LeaseGrantResponse, err error)

Grant delegates request to clientv3.Lease.Grant

func (*Client) Put

func (c *Client) Put(
	ctx context.Context, key, val string, opts ...clientv3.OpOption,
) (resp *clientv3.PutResponse, err error)

Put delegates request to clientv3.KV.Put

func (*Client) RequestProgress

func (c *Client) RequestProgress(ctx context.Context) error

RequestProgress requests a progress notify response be sent in all watch channels.

func (*Client) Revoke

func (c *Client) Revoke(
	ctx context.Context, id clientv3.LeaseID,
) (resp *clientv3.LeaseRevokeResponse, err error)

Revoke delegates request to clientv3.Lease.Revoke

func (*Client) TimeToLive

func (c *Client) TimeToLive(
	ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption,
) (resp *clientv3.LeaseTimeToLiveResponse, err error)

TimeToLive delegates request to clientv3.Lease.TimeToLive

func (*Client) Txn

func (c *Client) Txn(ctx context.Context) clientv3.Txn

Txn delegates request to clientv3.KV.Txn

func (*Client) Unwrap

func (c *Client) Unwrap() *clientv3.Client

Unwrap returns a clientv3.Client

func (*Client) Watch

func (c *Client) Watch(
	ctx context.Context, key string, opts ...clientv3.OpOption,
) clientv3.WatchChan

Watch delegates request to clientv3.Watcher.Watch

func (*Client) WatchWithChan

func (c *Client) WatchWithChan(
	ctx context.Context, outCh chan<- clientv3.WatchResponse,
	key string, opts ...clientv3.OpOption,
)

WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL