Documentation ¶
Index ¶
- Constants
- func GetEtcdKeyCaptureInfo(id string) string
- func GetEtcdKeyChangeFeedInfo(changefeedID string) string
- func GetEtcdKeyChangeFeedList() string
- func GetEtcdKeyJob(changeFeedID string) string
- func GetEtcdKeyTaskPosition(changefeedID, captureID string) string
- func GetEtcdKeyTaskStatus(changeFeedID, captureID string) string
- func InitMetrics(registry *prometheus.Registry)
- func SetupEmbedEtcd(dir string) (clientURL *url.URL, e *embed.Etcd, err error)
- type CDCEtcdClient
- func (c CDCEtcdClient) ClearAllCDCInfo(ctx context.Context) error
- func (c CDCEtcdClient) Close() error
- func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error
- func (c CDCEtcdClient) DeleteCaptureInfo(ctx context.Context, id string) error
- func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id string) error
- func (c CDCEtcdClient) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error)
- func (c CDCEtcdClient) GetAllChangeFeedInfo(ctx context.Context) (map[string]*model.ChangeFeedInfo, error)
- func (c CDCEtcdClient) GetAllChangeFeedStatus(ctx context.Context) (map[string]*model.ChangeFeedStatus, error)
- func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error)
- func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error)
- func (c CDCEtcdClient) GetCaptureInfo(ctx context.Context, id string) (info *model.CaptureInfo, err error)
- func (c CDCEtcdClient) GetCaptureLeases(ctx context.Context) (map[string]int64, error)
- func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error)
- func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, id string) (*model.ChangeFeedInfo, error)
- func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error)
- func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) (int64, map[string]*mvccpb.KeyValue, error)
- func (c CDCEtcdClient) GetOwnerID(ctx context.Context, key string) (string, error)
- func (c CDCEtcdClient) GetOwnerRevision(ctx context.Context, captureID string) (rev int64, err error)
- func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error)
- func (c CDCEtcdClient) GetTaskPosition(ctx context.Context, changefeedID string, captureID string) (int64, *model.TaskPosition, error)
- func (c CDCEtcdClient) GetTaskStatus(ctx context.Context, changefeedID string, captureID string) (int64, *model.TaskStatus, error)
- func (c CDCEtcdClient) PutCaptureInfo(ctx context.Context, info *model.CaptureInfo, leaseID clientv3.LeaseID) error
- func (c CDCEtcdClient) PutChangeFeedStatus(ctx context.Context, changefeedID string, status *model.ChangeFeedStatus) error
- func (c CDCEtcdClient) PutTaskPositionOnChange(ctx context.Context, changefeedID string, captureID string, ...) (bool, error)
- func (c CDCEtcdClient) PutTaskStatus(ctx context.Context, changefeedID string, captureID string, ...) error
- func (c CDCEtcdClient) RevokeAllLeases(ctx context.Context, leases map[string]int64) error
- func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error
- type CDCKey
- type CDCKeyType
- type Client
- func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.DeleteResponse, err error)
- func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error)
- func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error)
- func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (resp *clientv3.PutResponse, err error)
- func (c *Client) RequestProgress(ctx context.Context) error
- func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error)
- func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption) (resp *clientv3.LeaseTimeToLiveResponse, err error)
- func (c *Client) Txn(ctx context.Context) clientv3.Txn
- func (c *Client) Unwrap() *clientv3.Client
- func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan
- func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, ...)
Constants ¶
const ( EtcdPut = "Put" EtcdGet = "Get" EtcdTxn = "Txn" EtcdDel = "Del" EtcdGrant = "Grant" EtcdRevoke = "Revoke" )
etcd operation names
const ( // CaptureOwnerKey is the capture owner path that is saved to etcd CaptureOwnerKey = EtcdKeyBase + "/owner" // CaptureInfoKeyPrefix is the capture info path that is saved to etcd CaptureInfoKeyPrefix = EtcdKeyBase + "/capture" // TaskKeyPrefix is the prefix of task keys TaskKeyPrefix = EtcdKeyBase + "/task" // TaskStatusKeyPrefix is the prefix of task status keys TaskStatusKeyPrefix = TaskKeyPrefix + "/status" // TaskPositionKeyPrefix is the prefix of task position keys TaskPositionKeyPrefix = TaskKeyPrefix + "/position" // JobKeyPrefix is the prefix of job keys JobKeyPrefix = EtcdKeyBase + "/job" )
const (
// EtcdKeyBase is the common prefix of the keys in CDC
EtcdKeyBase = "/tikv/cdc"
)
Variables ¶
This section is empty.
Functions ¶
func GetEtcdKeyCaptureInfo ¶
GetEtcdKeyCaptureInfo returns the key of a capture info
func GetEtcdKeyChangeFeedInfo ¶
GetEtcdKeyChangeFeedInfo returns the key of a changefeed config
func GetEtcdKeyChangeFeedList ¶
func GetEtcdKeyChangeFeedList() string
GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config
func GetEtcdKeyJob ¶
GetEtcdKeyJob returns the key for a job status
func GetEtcdKeyTaskPosition ¶
GetEtcdKeyTaskPosition returns the key of a task position
func GetEtcdKeyTaskStatus ¶
GetEtcdKeyTaskStatus returns the key for the task status
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers the etcd request counter.
Types ¶
type CDCEtcdClient ¶
type CDCEtcdClient struct {
Client *Client
}
CDCEtcdClient is a wrap of etcd client
func NewCDCEtcdClient ¶
func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client) CDCEtcdClient
NewCDCEtcdClient returns a new CDCEtcdClient
func (CDCEtcdClient) ClearAllCDCInfo ¶
func (c CDCEtcdClient) ClearAllCDCInfo(ctx context.Context) error
ClearAllCDCInfo delete all keys created by CDC
func (CDCEtcdClient) Close ¶
func (c CDCEtcdClient) Close() error
Close releases resources in CDCEtcdClient
func (CDCEtcdClient) CreateChangefeedInfo ¶
func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error
CreateChangefeedInfo creates a change feed info into etcd and fails if it is already exists.
func (CDCEtcdClient) DeleteCaptureInfo ¶
func (c CDCEtcdClient) DeleteCaptureInfo(ctx context.Context, id string) error
DeleteCaptureInfo delete capture info from etcd.
func (CDCEtcdClient) DeleteChangeFeedInfo ¶
func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id string) error
DeleteChangeFeedInfo deletes a changefeed config from etcd
func (CDCEtcdClient) GetAllCDCInfo ¶
GetAllCDCInfo get all keys created by CDC
func (CDCEtcdClient) GetAllChangeFeedInfo ¶
func (c CDCEtcdClient) GetAllChangeFeedInfo(ctx context.Context) (map[string]*model.ChangeFeedInfo, error)
GetAllChangeFeedInfo queries all changefeed information
func (CDCEtcdClient) GetAllChangeFeedStatus ¶
func (c CDCEtcdClient) GetAllChangeFeedStatus(ctx context.Context) (map[string]*model.ChangeFeedStatus, error)
GetAllChangeFeedStatus queries all changefeed job status
func (CDCEtcdClient) GetAllTaskPositions ¶
func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error)
GetAllTaskPositions queries all task positions of a changefeed, and returns a map mapping from captureID to TaskPositions
func (CDCEtcdClient) GetAllTaskStatus ¶
func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error)
GetAllTaskStatus queries all task status of a changefeed, and returns a map mapping from captureID to TaskStatus
func (CDCEtcdClient) GetCaptureInfo ¶
func (c CDCEtcdClient) GetCaptureInfo(ctx context.Context, id string) (info *model.CaptureInfo, err error)
GetCaptureInfo get capture info from etcd. return errCaptureNotExist if the capture not exists.
func (CDCEtcdClient) GetCaptureLeases ¶
GetCaptureLeases returns a map mapping from capture ID to its lease
func (CDCEtcdClient) GetCaptures ¶
func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error)
GetCaptures returns kv revision and CaptureInfo list
func (CDCEtcdClient) GetChangeFeedInfo ¶
func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, id string) (*model.ChangeFeedInfo, error)
GetChangeFeedInfo queries the config of a given changefeed
func (CDCEtcdClient) GetChangeFeedStatus ¶
func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error)
GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed
func (CDCEtcdClient) GetChangeFeeds ¶
func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) (int64, map[string]*mvccpb.KeyValue, error)
GetChangeFeeds returns kv revision and a map mapping from changefeedID to changefeed detail mvccpb.KeyValue
func (CDCEtcdClient) GetOwnerID ¶
GetOwnerID returns the owner id by querying etcd
func (CDCEtcdClient) GetOwnerRevision ¶
func (c CDCEtcdClient) GetOwnerRevision(ctx context.Context, captureID string) (rev int64, err error)
GetOwnerRevision gets the Etcd revision for the elected owner.
func (CDCEtcdClient) GetProcessors ¶
func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error)
GetProcessors queries all processors of the cdc cluster, and returns a slice of ProcInfoSnap(without table info)
func (CDCEtcdClient) GetTaskPosition ¶
func (c CDCEtcdClient) GetTaskPosition( ctx context.Context, changefeedID string, captureID string, ) (int64, *model.TaskPosition, error)
GetTaskPosition queries task process from etcd, returns
- ModRevision of the given key
- *model.TaskPosition unmarshaled from the value
- error if error happens
func (CDCEtcdClient) GetTaskStatus ¶
func (c CDCEtcdClient) GetTaskStatus( ctx context.Context, changefeedID string, captureID string, ) (int64, *model.TaskStatus, error)
GetTaskStatus queries task status from etcd, returns
- ModRevision of the given key
- *model.TaskStatus unmarshalled from the value
- error if error happens
func (CDCEtcdClient) PutCaptureInfo ¶
func (c CDCEtcdClient) PutCaptureInfo(ctx context.Context, info *model.CaptureInfo, leaseID clientv3.LeaseID) error
PutCaptureInfo put capture info into etcd.
func (CDCEtcdClient) PutChangeFeedStatus ¶
func (c CDCEtcdClient) PutChangeFeedStatus( ctx context.Context, changefeedID string, status *model.ChangeFeedStatus, ) error
PutChangeFeedStatus puts changefeed synchronization status into etcd
func (CDCEtcdClient) PutTaskPositionOnChange ¶
func (c CDCEtcdClient) PutTaskPositionOnChange( ctx context.Context, changefeedID string, captureID string, info *model.TaskPosition, ) (bool, error)
PutTaskPositionOnChange puts task position information into etcd if the task position value changes or the presvious value does not exist in etcd. returns true if task position is written to etcd.
func (CDCEtcdClient) PutTaskStatus ¶
func (c CDCEtcdClient) PutTaskStatus( ctx context.Context, changefeedID string, captureID string, info *model.TaskStatus, ) error
PutTaskStatus puts task status into etcd.
func (CDCEtcdClient) RevokeAllLeases ¶
RevokeAllLeases revokes all leases passed from parameter
func (CDCEtcdClient) SaveChangeFeedInfo ¶
func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error
SaveChangeFeedInfo stores change feed info into etcd TODO: this should be called from outer system, such as from a TiDB client
type CDCKey ¶
type CDCKey struct { Tp CDCKeyType ChangefeedID string CaptureID string OwnerLeaseID string }
CDCKey represents a etcd key which is defined by TiCDC
Usage: we can parse a raw etcd key: ``` k := new(CDCKey) rawKey := "/tikv/cdc/changefeed/info/test/changefeed" err := k.Parse(rawKey) c.Assert(k, check.DeepEquals, &CDCKey{ Tp: CDCKeyTypeChangefeedInfo, ChangefeedID: "test/changefeed", }) ``` and we can generate a raw key from CDCKey ``` k := &CDCKey{ Tp: CDCKeyTypeChangefeedInfo, ChangefeedID: "test/changefeed", } c.Assert(k.String(), check.Equals, "/tikv/cdc/changefeed/info/test/changefeed") ```
type CDCKeyType ¶
type CDCKeyType = int
CDCKeyType is the type of etcd key
const ( CDCKeyTypeUnknown CDCKeyType = iota CDCKeyTypeOwner CDCKeyTypeCapture CDCKeyTypeChangefeedInfo CDCKeyTypeChangeFeedStatus CDCKeyTypeTaskPosition CDCKeyTypeTaskStatus CDCKeyTypeTaskWorkload )
the types of etcd key
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a simple wrapper that adds retry to etcd RPC
func (*Client) Delete ¶
func (c *Client) Delete( ctx context.Context, key string, opts ...clientv3.OpOption, ) (resp *clientv3.DeleteResponse, err error)
Delete delegates request to clientv3.KV.Delete
func (*Client) Get ¶
func (c *Client) Get( ctx context.Context, key string, opts ...clientv3.OpOption, ) (resp *clientv3.GetResponse, err error)
Get delegates request to clientv3.KV.Get
func (*Client) Grant ¶
func (c *Client) Grant( ctx context.Context, ttl int64, ) (resp *clientv3.LeaseGrantResponse, err error)
Grant delegates request to clientv3.Lease.Grant
func (*Client) Put ¶
func (c *Client) Put( ctx context.Context, key, val string, opts ...clientv3.OpOption, ) (resp *clientv3.PutResponse, err error)
Put delegates request to clientv3.KV.Put
func (*Client) RequestProgress ¶
RequestProgress requests a progress notify response be sent in all watch channels.
func (*Client) Revoke ¶
func (c *Client) Revoke( ctx context.Context, id clientv3.LeaseID, ) (resp *clientv3.LeaseRevokeResponse, err error)
Revoke delegates request to clientv3.Lease.Revoke
func (*Client) TimeToLive ¶
func (c *Client) TimeToLive( ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption, ) (resp *clientv3.LeaseTimeToLiveResponse, err error)
TimeToLive delegates request to clientv3.Lease.TimeToLive
func (*Client) Watch ¶
func (c *Client) Watch( ctx context.Context, key string, opts ...clientv3.OpOption, ) clientv3.WatchChan
Watch delegates request to clientv3.Watcher.Watch
func (*Client) WatchWithChan ¶
func (c *Client) WatchWithChan( ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption, )
WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh